Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c2ee054
Add `DelayedSubmissionData`
pentschev Nov 8, 2023
661092b
RequestTagMulti
pentschev Nov 8, 2023
78542d6
Expose tag mask to C++ and Python sync APIs
pentschev Nov 8, 2023
5263864
Add default tag mask value to Python core API
pentschev Nov 8, 2023
b007af6
Add `ucxx::TagMaskFull` constant
pentschev Nov 9, 2023
7b2e2d0
Update test to match latest cuDF changes
pentschev Nov 9, 2023
9b9b5f2
Merge remote-tracking branch 'upstream/branch-0.36' into tag-mask
pentschev Nov 27, 2023
a6be101
Merge remote-tracking branch 'upstream/branch-0.36' into tag-mask
pentschev Nov 29, 2023
745e6e6
Merge remote-tracking branch 'upstream/branch-0.36' into tag-mask
pentschev Dec 5, 2023
d706c32
Use strong C++ types `Tag` and `TagMask` instead of `ucp_tag_t`
pentschev Dec 6, 2023
9dac813
Add Python types for tags
pentschev Dec 6, 2023
5037e8f
Use `std::variant` for `ucxx::DelayedSubmissionData`
pentschev Dec 6, 2023
89e17a6
Add transfer direction to `ucxx::DelayedSubmission`
pentschev Dec 6, 2023
3d50c74
Fix exception's transfer direction
pentschev Dec 7, 2023
c8c656f
Replace `bool send` with new `TransferDirection`
pentschev Dec 7, 2023
43acf5b
Merge remote-tracking branch 'upstream/branch-0.36' into tag-mask
pentschev Dec 7, 2023
f673cdc
Add missing `types.py`
pentschev Dec 7, 2023
1537e1e
Expose `CallbackNotifier` via `ucxx/api.h`
pentschev Dec 7, 2023
e80759b
Use `std::variant` and `std::visit` to handle request data
pentschev Dec 7, 2023
565083e
Fix `request_data.h` include
pentschev Dec 8, 2023
167b5cf
Fix active message send
pentschev Dec 11, 2023
552133d
Zero-sized tag and active messages should be supported
pentschev Dec 11, 2023
c644dc6
Test zero-sized messages
pentschev Dec 11, 2023
fc63a0e
Use flag to copy AM send header
pentschev Dec 11, 2023
18288c8
Use `std::visit` return value to set `terminate`
pentschev Dec 11, 2023
8385ada
Define custom `TagPair` type in `request_tag_multi.cpp`
pentschev Dec 11, 2023
ffc1c10
Do not specify argument name where unneeded and use `decltype`
pentschev Dec 11, 2023
d4d44bb
Delete unintendeded `RequestData` default constructors
pentschev Dec 11, 2023
3558c0b
Use `std::variant` in constructors
pentschev Dec 11, 2023
a5742ac
Use `std::numeric_limits` to set `TagMaskFull` value
pentschev Dec 11, 2023
05fcb93
Remove unnecessary "by-reference" template
pentschev Dec 12, 2023
3593cb1
Remove typo in SPDX comment
pentschev Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ add_library(
src/log.cpp
src/request.cpp
src/request_am.cpp
src/request_data.cpp
src/request_helper.cpp
src/request_stream.cpp
src/request_tag.cpp
Expand Down
12 changes: 7 additions & 5 deletions cpp/benchmarks/perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum class ProgressMode {
enum transfer_type_t { SEND, RECV };

typedef std::unordered_map<transfer_type_t, std::vector<char>> BufferMap;
typedef std::unordered_map<transfer_type_t, ucp_tag_t> TagMap;
typedef std::unordered_map<transfer_type_t, ucxx::Tag> TagMap;

typedef std::shared_ptr<BufferMap> BufferMapPtr;
typedef std::shared_ptr<TagMap> TagMapPtr;
Expand Down Expand Up @@ -267,7 +267,8 @@ auto doTransfer(const app_context_t& app_context,
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::shared_ptr<ucxx::Request>> requests = {
endpoint->tagSend((*bufferMap)[SEND].data(), app_context.message_size, (*tagMap)[SEND]),
endpoint->tagRecv((*bufferMap)[RECV].data(), app_context.message_size, (*tagMap)[RECV])};
endpoint->tagRecv(
(*bufferMap)[RECV].data(), app_context.message_size, (*tagMap)[RECV], ucxx::TagMaskFull)};

// Wait for requests and clear requests
waitRequests(app_context.progress_mode, worker, requests);
Expand All @@ -292,8 +293,8 @@ int main(int argc, char** argv)

bool is_server = app_context.server_addr == NULL;
auto tagMap = std::make_shared<TagMap>(TagMap{
{SEND, is_server ? 0 : 1},
{RECV, is_server ? 1 : 0},
{SEND, is_server ? ucxx::Tag{0} : ucxx::Tag{1}},
{RECV, is_server ? ucxx::Tag{1} : ucxx::Tag{0}},
});

std::shared_ptr<ListenerContext> listener_ctx;
Expand Down Expand Up @@ -337,7 +338,8 @@ int main(int argc, char** argv)
(*tagMap)[SEND]));
requests.push_back(endpoint->tagRecv((*wireupBufferMap)[RECV].data(),
(*wireupBufferMap)[RECV].size() * sizeof(int),
(*tagMap)[RECV]));
(*tagMap)[RECV],
ucxx::TagMaskFull));

// Wait for wireup requests and clear requests
waitRequests(app_context.progress_mode, worker, requests);
Expand Down
24 changes: 13 additions & 11 deletions cpp/examples/basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,27 @@ int main(int argc, char** argv)

// Schedule small wireup messages to let UCX identify capabilities between endpoints
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), 0));
requests.push_back(
endpoint->tagRecv(recvWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), 0));
sendWireupBuffer.data(), sendWireupBuffer.size() * sizeof(int), ucxx::Tag{0}));
requests.push_back(endpoint->tagRecv(recvWireupBuffer.data(),
sendWireupBuffer.size() * sizeof(int),
ucxx::Tag{0},
ucxx::TagMaskFull));
::waitRequests(progress_mode, worker, requests);
requests.clear();

// Schedule send and recv messages on different tags and different ordering
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendBuffers[0].data(), sendBuffers[0].size() * sizeof(int), 0));
sendBuffers[0].data(), sendBuffers[0].size() * sizeof(int), ucxx::Tag{0}));
requests.push_back(listener_ctx->getEndpoint()->tagRecv(
recvBuffers[1].data(), recvBuffers[1].size() * sizeof(int), 1));
recvBuffers[1].data(), recvBuffers[1].size() * sizeof(int), ucxx::Tag{1}, ucxx::TagMaskFull));
requests.push_back(listener_ctx->getEndpoint()->tagSend(
sendBuffers[2].data(), sendBuffers[2].size() * sizeof(int), 2));
requests.push_back(
endpoint->tagRecv(recvBuffers[2].data(), recvBuffers[2].size() * sizeof(int), 2));
requests.push_back(
endpoint->tagSend(sendBuffers[1].data(), sendBuffers[1].size() * sizeof(int), 1));
sendBuffers[2].data(), sendBuffers[2].size() * sizeof(int), ucxx::Tag{2}, ucxx::TagMaskFull));
requests.push_back(endpoint->tagRecv(
recvBuffers[2].data(), recvBuffers[2].size() * sizeof(int), ucxx::Tag{2}, ucxx::TagMaskFull));
requests.push_back(
endpoint->tagRecv(recvBuffers[0].data(), recvBuffers[0].size() * sizeof(int), 0));
endpoint->tagSend(sendBuffers[1].data(), sendBuffers[1].size() * sizeof(int), ucxx::Tag{1}));
requests.push_back(endpoint->tagRecv(
recvBuffers[0].data(), recvBuffers[0].size() * sizeof(int), ucxx::Tag{0}, ucxx::TagMaskFull));

// Wait for requests to be set, i.e., transfers complete
::waitRequests(progress_mode, worker, requests);
Expand Down
1 change: 1 addition & 0 deletions cpp/include/ucxx/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
#include <ucxx/request.h>
#include <ucxx/request_tag_multi.h>
#include <ucxx/typedefs.h>
#include <ucxx/utils/callback_notifier.h>
#include <ucxx/worker.h>
62 changes: 24 additions & 38 deletions cpp/include/ucxx/constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <string>
#include <vector>

#include <ucxx/request_data.h>
#include <ucxx/typedefs.h>

namespace ucxx {
Expand Down Expand Up @@ -55,43 +56,28 @@ std::shared_ptr<Worker> createWorker(std::shared_ptr<Context> context,
const bool enableFuture);

// Transfers
std::shared_ptr<RequestAm> createRequestAmSend(std::shared_ptr<Endpoint> endpoint,
void* buffer,
size_t length,
ucs_memory_type_t memoryType,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestAm> createRequestAmRecv(std::shared_ptr<Endpoint> endpoint,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(std::shared_ptr<Endpoint> endpoint,
bool send,
void* buffer,
size_t length,
const bool enablePythonFuture);

std::shared_ptr<RequestTag> createRequestTag(std::shared_ptr<Component> endpointOrWorker,
bool send,
void* buffer,
size_t length,
ucp_tag_t tag,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMultiSend(std::shared_ptr<Endpoint> endpoint,
const std::vector<void*>& buffer,
const std::vector<size_t>& size,
const std::vector<int>& isCUDA,
const ucp_tag_t tag,
const bool enablePythonFuture);

std::shared_ptr<RequestTagMulti> createRequestTagMultiRecv(std::shared_ptr<Endpoint> endpoint,
const ucp_tag_t tag,
const bool enablePythonFuture);
std::shared_ptr<RequestAm> createRequestAm(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::AmSend, data::AmReceive> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestStream> createRequestStream(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::StreamSend, data::StreamReceive> requestData,
const bool enablePythonFuture);

std::shared_ptr<RequestTag> createRequestTag(
std::shared_ptr<Component> endpointOrWorker,
const std::variant<data::TagSend, data::TagReceive> requestData,
const bool enablePythonFuture,
RequestCallbackUserFunction callbackFunction,
RequestCallbackUserData callbackData);

std::shared_ptr<RequestTagMulti> createRequestTagMulti(
std::shared_ptr<Endpoint> endpoint,
const std::variant<data::TagMultiSend, data::TagMultiReceive> requestData,
const bool enablePythonFuture);

} // namespace ucxx
43 changes: 5 additions & 38 deletions cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,23 @@
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include <ucp/api/ucp.h>
#include <ucs/memory/memory_type.h>

#include <ucxx/log.h>
#include <ucxx/request_data.h>

namespace ucxx {

typedef std::function<void()> DelayedSubmissionCallbackType;

class DelayedSubmission {
public:
bool _send{false}; ///< Whether this is a send (`true`) operation or recv (`false`)
void* _buffer{nullptr}; ///< Raw pointer to data buffer
size_t _length{0}; ///< Length of the message in bytes
ucp_tag_t _tag{0}; ///< Tag to match
ucs_memory_type_t _memoryType{UCS_MEMORY_TYPE_UNKNOWN}; ///< Buffer memory type

DelayedSubmission() = delete;

/**
* @brief Constructor for a delayed submission operation.
*
* Construct a delayed submission operation. Delayed submission means that a transfer
* operation will not be submitted immediately, but will rather be delayed for the next
* progress iteration.
*
* This may be useful to avoid any transfer operations to be executed directly in the
* application thread, delaying all of them for the worker progress thread when enabled.
* With this approach any perceived overhead will be removed from the application thread,
* and thus provide some speedup in certain situations. It may be also useful to prevent
* a multi-threaded application for blocking while waiting for the UCX spinlock, since
* all transfer operations may be pushed to the worker progress thread.
*
* @param[in] send whether this is a send (`true`) or receive (`false`) operation.
* @param[in] buffer a raw pointer to the data being transferred.
* @param[in] length the size in bytes of the message being transfer.
* @param[in] tag tag to match for this operation (only applies for tag
* operations).
* @param[in] memoryType the memory type of the buffer.
*/
DelayedSubmission(const bool send,
void* buffer,
const size_t length,
const ucp_tag_t tag = 0,
const ucs_memory_type_t memoryType = UCS_MEMORY_TYPE_UNKNOWN);
};

template <typename T>
class BaseDelayedSubmissionCollection {
protected:
Expand Down
13 changes: 9 additions & 4 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ class Endpoint : public Component {
*/
std::shared_ptr<Request> tagSend(void* buffer,
size_t length,
ucp_tag_t tag,
Tag tag,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);
Expand All @@ -408,6 +408,7 @@ class Endpoint : public Component {
* data will be stored.
* @param[in] length the size in bytes of the tag message to be received.
* @param[in] tag the tag to match.
* @param[in] tagMask the tag mask to use.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
* @param[in] callbackFunction user-defined callback function to call upon completion.
Expand All @@ -417,7 +418,8 @@ class Endpoint : public Component {
*/
std::shared_ptr<Request> tagRecv(void* buffer,
size_t length,
ucp_tag_t tag,
Tag tag,
TagMask tagMask,
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);
Expand Down Expand Up @@ -460,7 +462,7 @@ class Endpoint : public Component {
std::shared_ptr<Request> tagMultiSend(const std::vector<void*>& buffer,
const std::vector<size_t>& size,
const std::vector<int>& isCUDA,
const ucp_tag_t tag,
const Tag tag,
const bool enablePythonFuture);

/**
Expand All @@ -479,12 +481,15 @@ class Endpoint : public Component {
* ensure the transfer has completed. Requires UCXX Python support.
*
* @param[in] tag the tag to match.
* @param[in] tagMask the tag mask to use.
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
*
* @returns Request to be subsequently checked for the completion and its state.
*/
std::shared_ptr<Request> tagMultiRecv(const ucp_tag_t tag, const bool enablePythonFuture);
std::shared_ptr<Request> tagMultiRecv(const Tag tag,
const TagMask tagMask,
const bool enablePythonFuture);

/**
* @brief Get `ucxx::Worker` component from a worker or listener object.
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ucxx/component.h>
#include <ucxx/endpoint.h>
#include <ucxx/future.h>
#include <ucxx/request_data.h>
#include <ucxx/typedefs.h>

#define ucxx_trace_req_f(_owner, _req, _name, _message, ...) \
Expand All @@ -34,9 +35,8 @@ class Request : public Component {
std::shared_ptr<Endpoint> _endpoint{
nullptr}; ///< Endpoint that generated request (if not from worker)
std::string _ownerString{
"undetermined owner"}; ///< String to print owner (endpoint or worker) when logging
std::shared_ptr<DelayedSubmission> _delayedSubmission{
nullptr}; ///< The submission object that will dispatch the request
"undetermined owner"}; ///< String to print owner (endpoint or worker) when logging
data::RequestData _requestData{}; ///< The operation-specific data to be used in the request
std::string _operationName{
"request_undefined"}; ///< Human-readable operation name, mostly used for log messages
std::recursive_mutex _mutex{}; ///< Mutex to prevent checking status while it's being set
Expand All @@ -62,7 +62,7 @@ class Request : public Component {
* subsequently notified.
*/
Request(std::shared_ptr<Component> endpointOrWorker,
std::shared_ptr<DelayedSubmission> delayedSubmission,
const data::RequestData requestData,
const std::string operationName,
const bool enablePythonFuture = false);

Expand Down
Loading