diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 654f7a45ae..b91cac4627 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,6 +15,10 @@ raja_add_executable( NAME resource-forall SOURCES resource-forall.cpp) +raja_add_executable( + NAME messages-forall + SOURCES messages-forall.cpp) + raja_add_executable( NAME dynamic-forall SOURCES dynamic-forall.cpp) diff --git a/examples/messages-forall.cpp b/examples/messages-forall.cpp new file mode 100644 index 0000000000..ced7377216 --- /dev/null +++ b/examples/messages-forall.cpp @@ -0,0 +1,464 @@ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#include +#include +#include + +#include "memoryManager.hpp" + +#include "RAJA/RAJA.hpp" +#include "RAJA/util/resource.hpp" + +/* + * Vector Addition Example + * + * Computes c = a + b, where a, b, c are vectors of ints. + * It illustrates similarities between a C-style for-loop and a RAJA + * forall loop with using RAJA messages to print out if a value is + * negative. + * + * Note: this example is the same as resource-forall.cpp with + * some additional logger to store how the message handler can + * be used as a basic logger for both host and device. + * + * RAJA features shown: + * - `forall` loop iteration with messages + * - Create message handler with Resource argument + * + */ + +// This is a simplified example fixed string to show how +// custom types can be used with the message queue. +template +class my_string +{ +public: + char m_data[N]; + + my_string(const char* str) + { + if (str == NULL) { return; } + std::size_t i = 0; + for (i = 0; *str != '\0' && i < N; str++, i++) { + m_data[i] = *str; + } + + std::size_t len = (i < N) ? i : N-1; + m_data[len] = '\0'; + } + + const char* c_str() const + { + return m_data; + } +}; + +// +// Functions for checking and printing results +// +void checkResult(int* res, int len); +void printResult(int* res, int len); + + +int main(int RAJA_UNUSED_ARG(argc), char **RAJA_UNUSED_ARG(argv[])) +{ + + std::cout << "\n\nRAJA vector addition example...\n"; + + RAJA::resources::Host host{}; + +// +// Define number of messages that can be stored +// + const std::size_t num_messages = 1; + const std::size_t message_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args, int*, int, int>)); + + const std::size_t buf_sz = num_messages*message_sz; + +// +// Allocate and initialize message handler and queue +// + auto logger = RAJA::make_message_manager(buf_sz, host); + auto cpu_msg_queue = logger.get_queue(0, + [](const my_string<128>& str, int* ptr, int idx, int value) { + std::cout << "\n " << str.c_str() << " " << ptr << " a[" << idx << "] = " << value << "\n"; + } + ); + +// +// Define vector length +// + const int N = 100000; + +// +// Allocate and initialize vector data +// + + int *a = host.allocate(N); + int *b = host.allocate(N); + int *c = host.allocate(N); + + int *a_ = host.allocate(N); + int *b_ = host.allocate(N); + int *c_ = host.allocate(N); + + + for (int i = 0; i < N; ++i) { + a[i] = -i; + b[i] = 2 * i; + a_[i] = -i; + b_[i] = 2 * i; + + } + + +//----------------------------------------------------------------------------// + + std::cout << "\n Running C-style vector addition...\n"; + + for (int i = 0; i < N; ++i) { + if (a[i] < 0) { + cpu_msg_queue.try_post_message("message from C-style loop", a, i, a[i]); + } + c[i] = a[i] + b[i]; + } + + checkResult(c, N); + logger.wait_all(); + + +//----------------------------------------------------------------------------// +// RAJA::seq_exec policy enforces sequential execution.... +//----------------------------------------------------------------------------// + + std::cout << "\n Running RAJA sequential vector addition...\n"; + + + RAJA::forall(host, RAJA::RangeSegment(0, N), [=] (int i) { + if (a[i] < 0) { + cpu_msg_queue.try_post_message("message from RAJA seq_exec loop", a, i, a[i]); + } + c[i] = a[i] + b[i]; + }); + + checkResult(c, N); + logger.wait_all(); + +//----------------------------------------------------------------------------// +// RAJA::sind_exec policy enforces simd execution.... +//----------------------------------------------------------------------------// + + std::cout << "\n Running RAJA simd_exec vector addition...\n"; + + RAJA::forall(host, RAJA::RangeSegment(0, N), [=] (int i) { + c[i] = a[i] + b[i]; + }); + + checkResult(c, N); + +#if defined(RAJA_ENABLE_OPENMP) +//----------------------------------------------------------------------------// +// RAJA::omp_for_parallel_exec policy execution.... +//----------------------------------------------------------------------------// + + std::cout << "\n Running RAJA omp_parallel_for_exec vector addition...\n"; + + RAJA::forall(host, RAJA::RangeSegment(0, N), + [=] (int i) { + if (a[i] < 0) { + cpu_msg_queue.try_post_message("message from RAJA omp_parallel_for_exec loop", a, i, a[i]); + } + c[i] = a[i] + b[i]; + }); + + checkResult(c, N); + logger.wait_all(); + +//----------------------------------------------------------------------------// +// RAJA::omp_parallel_for_static_exec policy execution.... +//----------------------------------------------------------------------------// + + std::cout << "\n Running RAJA omp_parallel_for_static_exec (default chunksize) vector addition...\n"; + + RAJA::forall>(host, RAJA::RangeSegment(0, N), + [=] (int i) { + if (a[i] < 0) { + cpu_msg_queue.try_post_message("message from RAJA omp_parallel_for_static_exec loop", a, i, a[i]); + } + c[i] = a[i] + b[i]; + }); + + checkResult(c, N); + logger.wait_all(); + +//----------------------------------------------------------------------------// +// RAJA::omp_parallel_for_dynamic_exec policy execution.... +//----------------------------------------------------------------------------// + + std::cout << "\n Running RAJA omp_for_dynamic_exec (chunksize = 16) vector addition...\n"; + + RAJA::forall>(host, RAJA::RangeSegment(0, N), + [=] (int i) { + if (a[i] < 0) { + cpu_msg_queue.try_post_message("message from RAJA omp_parallel_for_dynamic_exec<16> loop", a, i, a[i]); + } + c[i] = a[i] + b[i]; + }); + + checkResult(c, N); + logger.wait_all(); +#endif + + + +#if defined(RAJA_ENABLE_CUDA) || defined(RAJA_ENABLE_HIP) || defined(RAJA_ENABLE_SYCL) + +/* + GPU_BLOCK_SIZE - specifies the number of threads in a CUDA/HIP thread block +*/ +const int GPU_BLOCK_SIZE = 256; + +//----------------------------------------------------------------------------// +// RAJA::cuda/hip_exec policy execution.... +//----------------------------------------------------------------------------// +{ + std::cout << "\n Running RAJA GPU vector addition on 2 seperate streams...\n"; +#if defined(RAJA_ENABLE_CUDA) + RAJA::resources::Cuda res_gpu1; + RAJA::resources::Cuda res_gpu2; + using EXEC_POLICY = RAJA::cuda_exec_async; +#elif defined(RAJA_ENABLE_HIP) + RAJA::resources::Hip res_gpu1; + RAJA::resources::Hip res_gpu2; + using EXEC_POLICY = RAJA::hip_exec_async; +#elif defined(RAJA_ENABLE_SYCL) + RAJA::resources::Sycl res_gpu1; + RAJA::resources::Sycl res_gpu2; + using EXEC_POLICY = RAJA::sycl_exec; +#endif + auto gpu_logger1 = RAJA::make_message_manager(buf_sz, res_gpu1); + auto gpu_msg_queue1 = gpu_logger1.get_queue(0, + [](const my_string<128>& str, int* ptr, int idx, int value) { + std::cout << "\n " << str.c_str() << " " << ptr << " a[" << idx << "] = " << value << "\n"; + } + ); + + auto gpu_logger2 = RAJA::make_message_manager(buf_sz, res_gpu2); + auto gpu_msg_queue2 = gpu_logger2.get_queue(0, + [](const my_string<128>& str, int* ptr, int idx, int value) { + std::cout << "\n " << str.c_str() << " " << ptr << " a[" << idx << "] = " << value << "\n"; + } + ); + + int* d_a1 = res_gpu1.allocate(N); + int* d_b1 = res_gpu1.allocate(N); + int* d_c1 = res_gpu1.allocate(N); + + int* d_a2 = res_gpu2.allocate(N); + int* d_b2 = res_gpu2.allocate(N); + int* d_c2 = res_gpu2.allocate(N); + + res_gpu1.memcpy(d_a1, a, sizeof(int)* N); + res_gpu1.memcpy(d_b1, b, sizeof(int)* N); + + res_gpu2.memcpy(d_a2, a, sizeof(int)* N); + res_gpu2.memcpy(d_b2, b, sizeof(int)* N); + + + RAJA::forall(res_gpu1, RAJA::RangeSegment(0, N), + [=] RAJA_DEVICE (int i) { + if (d_a1[i] < 0) { + gpu_msg_queue1.try_post_message("message from GPU stream 1: pointer =", d_a1, i, d_a1[i]); + } + d_c1[i] = d_a1[i] + d_b1[i]; + }); + + RAJA::forall(res_gpu2, RAJA::RangeSegment(0, N), + [=] RAJA_DEVICE (int i) { + if (d_a2[i] < 0) { + gpu_msg_queue2.try_post_message("message from GPU stream 2: pointer =", d_a2, i, d_a2[i]); + } + d_c2[i] = d_a2[i] + d_b2[i]; + }); + + res_gpu1.memcpy(c, d_c1, sizeof(int)*N ); + + res_gpu2.memcpy(c_, d_c2, sizeof(int)*N ); + + checkResult(c, N); + checkResult(c_, N); + + gpu_logger1.wait_all(); + gpu_logger2.wait_all(); + + res_gpu1.deallocate(d_a1); + res_gpu1.deallocate(d_b1); + res_gpu1.deallocate(d_c1); + + res_gpu2.deallocate(d_a2); + res_gpu2.deallocate(d_b2); + res_gpu2.deallocate(d_c2); +} + + +//----------------------------------------------------------------------------// +// RAJA::cuda/hip_exec policy with waiting event.... +//----------------------------------------------------------------------------// +{ + std::cout << "\n Running RAJA GPU vector with dependency between two seperate streams...\n"; +#if defined(RAJA_ENABLE_CUDA) + // _raja_res_defres_start + RAJA::resources::Cuda res_gpu1; + RAJA::resources::Cuda res_gpu2; + RAJA::resources::Host res_host; + + using EXEC_POLICY = RAJA::cuda_exec_async; + // _raja_res_defres_end +#elif defined(RAJA_ENABLE_HIP) + RAJA::resources::Hip res_gpu1; + RAJA::resources::Hip res_gpu2; + RAJA::resources::Host res_host; + + using EXEC_POLICY = RAJA::hip_exec_async; +#elif defined(RAJA_ENABLE_SYCL) + RAJA::resources::Sycl res_gpu1; + RAJA::resources::Sycl res_gpu2; + RAJA::resources::Host res_host; + + using EXEC_POLICY = RAJA::sycl_exec; +#endif + auto gpu_logger1 = RAJA::make_message_manager(buf_sz, res_gpu1); + auto gpu_msg_queue1 = gpu_logger1.get_queue(0, + [](int* ptr, int idx, int value) { + std::cout << "\n gpu stream 1: pointer (" << ptr << ") d_array1[" << idx << "] = " << value << "\n"; + } + ); + + auto gpu_logger2 = RAJA::make_message_manager(buf_sz, res_gpu2); + auto gpu_msg_queue2 = gpu_logger2.get_queue(0, + [](int* ptr, int idx, int value) { + std::cout << "\n gpu stream 2: pointer (" << ptr << ") d_array2[" << idx << "] = " << value << "\n"; + } + ); + + // _raja_res_alloc_start + int* d_array1 = res_gpu1.allocate(N); + int* d_array2 = res_gpu2.allocate(N); + int* h_array = res_host.allocate(N); + // _raja_res_alloc_end + + // _raja_res_k1_start + RAJA::forall(res_gpu1, RAJA::RangeSegment(0,N), + [=] RAJA_HOST_DEVICE (int i) { + d_array1[i] = i; + gpu_msg_queue1.try_post_message(d_array1, i, d_array1[i]); + } + ); + // _raja_res_k1_end + + // Log message for stream 1 + gpu_logger1.wait_all(); + + // _raja_res_k2_start + RAJA::resources::Event e = RAJA::forall(res_gpu2, RAJA::RangeSegment(0,N), + [=] RAJA_HOST_DEVICE (int i) { + d_array2[i] = -1; + gpu_msg_queue2.try_post_message(d_array2, i, d_array2[i]); + } + ); + // _raja_res_k2_end + + // _raja_res_wait_start + res_gpu2.wait_for(&e); + // _raja_res_wait_end + + // _raja_res_k3_start + RAJA::forall(res_gpu1, RAJA::RangeSegment(0,N), + [=] RAJA_HOST_DEVICE (int i) { + d_array1[i] *= d_array2[i]; + gpu_msg_queue1.try_post_message(d_array1, i, d_array1[i]); + } + ); + // _raja_res_k3_end + + // Log message for stream 2 + gpu_logger2.wait_all(); + + // _raja_res_memcpy_start + res_gpu1.memcpy(h_array, d_array1, sizeof(int) * N); + // _raja_res_memcpy_end + + // Log message for stream 1 + gpu_logger1.wait_all(); + + // _raja_res_k4_start + bool check = true; + RAJA::forall(res_host, RAJA::RangeSegment(0,N), + [&check, h_array] (int i) { + if(h_array[i] != -i) {check = false;} + } + ); + // _raja_res_k4_end + + std::cout << "\n result -- "; + if (check) std::cout << "PASS\n"; + else std::cout << "FAIL\n"; + + + res_gpu1.deallocate(d_array1); + res_gpu2.deallocate(d_array2); + res_host.deallocate(h_array); + +} + +#endif +// +// +// Clean up. +// + host.deallocate(a); + host.deallocate(b); + host.deallocate(c); + + host.deallocate(a_); + host.deallocate(b_); + host.deallocate(c_); + + std::cout << "\n DONE!...\n"; + + return 0; +} + +// +// Function to check result and report P/F. +// +void checkResult(int* res, int len) +{ + bool correct = true; + for (int i = 0; i < len; i++) { + if ( res[i] != i ) { correct = false; } + } + if ( correct ) { + std::cout << "\n\t result -- PASS\n"; + } else { + std::cout << "\n\t result -- FAIL\n"; + } +} + +// +// Function to print result. +// +void printResult(int* res, int len) +{ + std::cout << std::endl; + for (int i = 0; i < len; i++) { + std::cout << "result[" << i << "] = " << res[i] << std::endl; + } + std::cout << std::endl; +} diff --git a/include/RAJA/RAJA.hpp b/include/RAJA/RAJA.hpp index abc9f30738..3ff3136807 100644 --- a/include/RAJA/RAJA.hpp +++ b/include/RAJA/RAJA.hpp @@ -125,6 +125,11 @@ // #include "RAJA/util/Span.hpp" +// +// Message handler to pass messages between host and device +// +#include "RAJA/util/messages.hpp" + // // zip iterator to iterator over sequences simultaneously // diff --git a/include/RAJA/policy/msg_queue.hpp b/include/RAJA/policy/msg_queue.hpp new file mode 100644 index 0000000000..894de8cf5d --- /dev/null +++ b/include/RAJA/policy/msg_queue.hpp @@ -0,0 +1,24 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief Header file containing RAJA headers for message queue policies. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#ifndef RAJA_msg_queue_HPP +#define RAJA_msg_queue_HPP + +#include "RAJA/policy/msg_queue/mpsc_queue.hpp" +#include "RAJA/policy/msg_queue/spsc_queue.hpp" + +#endif // closing endif for header file include guard diff --git a/include/RAJA/policy/msg_queue/mpsc_queue.hpp b/include/RAJA/policy/msg_queue/mpsc_queue.hpp new file mode 100644 index 0000000000..ceb4c13b66 --- /dev/null +++ b/include/RAJA/policy/msg_queue/mpsc_queue.hpp @@ -0,0 +1,127 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief Header file containing implementation for a MPSC + * message queue policy. By SPSC, means multi-producer + * single-consumer. In other words, messages produced + * could be from multiple thread may require atomics. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#ifndef RAJA_mpsc_queue_HPP +#define RAJA_mpsc_queue_HPP + +#include "RAJA/util/concepts.hpp" +#include "RAJA/pattern/atomic.hpp" + +#include "RAJA/util/msg_header.hpp" +#include "RAJA/policy/msg_queue/policy.hpp" + +namespace RAJA +{ +namespace messages +{ + +template +class queue> +{ +public: + using policy = RAJA::mpsc_queue; + + using args_type = camp::tuple; + using size_type = typename Container::size_type; + + queue(int id, Container& container) : m_id {id}, m_container {&container} {} + + queue(int id, Container* container) : m_id {id}, m_container {container} {} + + /// Posts message to queue. This is marked `const` to pass to lambda by + /// copy. This throws away messages that are over the capacity of the + /// container. + template + RAJA_HOST_DEVICE bool try_post_message(Ts&&... args) const + { + if (m_container != nullptr) + { + constexpr size_type header_sz = align(sizeof(msg_header)); + constexpr size_type args_sz = align(sizeof(msg_args)); + constexpr size_type msg_sz = header_sz + args_sz; + auto local_size = + RAJA::atomicAdd(&(m_container->m_end), msg_sz); + if (m_container->m_data != nullptr && + local_size + msg_sz <= m_container->m_capacity) + { + char* buf = m_container->m_data + local_size; + new (buf) msg_header {args_sz, m_id, buf + header_sz}; + new (buf + header_sz) + msg_args {args_type(std::forward(args)...)}; + + // Actual size of buffer used + RAJA::atomicAdd(&(m_container->m_size), msg_sz); + return true; + } + } + + return false; + } + +private: + int m_id; + Container* m_container; +}; + +// TODO: turning off for now +// need to relook at logic +#if 0 +template +class queue +{ +public: + using policy = RAJA::mpsc_queue_overwrite; + + using value_type = typename Container::value_type; + using size_type = typename Container::size_type; + + queue(Container& container) : m_container {&container} {} + + queue(Container* container) : m_container {container} {} + + /// Posts message to queue. This is marked `const` to pass to lambda by + /// copy. This overwrites previously stored messages once the number of + /// messages are over the capacity of the container. + template + RAJA_HOST_DEVICE bool try_post_message(Ts&&... args) const + { + if (m_container != nullptr) + { + auto local_size = RAJA::atomicInc(&(m_container->m_size)); + if (m_container->m_data != nullptr) + { + m_container->m_data[local_size % m_container->m_capacity] = + value_type(std::forward(args)...); + return true; + } + } + + return false; + } + +private: + Container* m_container; +}; +#endif + +} // namespace messages +} // namespace RAJA + +#endif // closing endif for header file include guard diff --git a/include/RAJA/policy/msg_queue/policy.hpp b/include/RAJA/policy/msg_queue/policy.hpp new file mode 100644 index 0000000000..c932fd7140 --- /dev/null +++ b/include/RAJA/policy/msg_queue/policy.hpp @@ -0,0 +1,71 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief Header file containing RAJA message queue policy definitions. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#ifndef policy_msg_queue_HPP +#define policy_msg_queue_HPP + +#include "RAJA/policy/PolicyBase.hpp" + +namespace RAJA +{ +namespace messages +{ + +/// +/// This is a view-like queue so that message queues can be copied to kernels. +/// +template +class queue; + +} // namespace messages + +namespace policy +{ +namespace messages +{ + +// +////////////////////////////////////////////////////////////////////// +// +// Queue policies +// +////////////////////////////////////////////////////////////////////// +// + +template +struct mpsc_queue +{ + static constexpr bool should_overwrite = Overwrite; +}; + +template +struct spsc_queue +{ + static constexpr bool should_overwrite = Overwrite; +}; + +} // namespace messages +} // namespace policy + +using spsc_queue = policy::messages::spsc_queue; +using spsc_queue_overwrite = policy::messages::spsc_queue; +using mpsc_queue = policy::messages::mpsc_queue; +using mpsc_queue_overwrite = policy::messages::mpsc_queue; + +} // namespace RAJA + +#endif diff --git a/include/RAJA/policy/msg_queue/spsc_queue.hpp b/include/RAJA/policy/msg_queue/spsc_queue.hpp new file mode 100644 index 0000000000..5de86161fc --- /dev/null +++ b/include/RAJA/policy/msg_queue/spsc_queue.hpp @@ -0,0 +1,127 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief Header file containing implementation for a SPSC + * message queue policy. By SPSC, means single-producer + * single-consumer. In other words, messages will be + * produced from one thread and no atomics needed. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#ifndef RAJA_spsc_queue_HPP +#define RAJA_spsc_queue_HPP + +#include "RAJA/util/concepts.hpp" +#include "RAJA/pattern/atomic.hpp" + +#include "RAJA/util/msg_header.hpp" +#include "RAJA/policy/msg_queue/policy.hpp" + +namespace RAJA +{ +namespace messages +{ + +template +class queue> +{ +public: + using policy = RAJA::spsc_queue; + + using args_type = camp::tuple; + using size_type = typename Container::size_type; + + queue(int id, Container& container) : m_id {id}, m_container {&container} {} + + queue(int id, Container* container) : m_id {id}, m_container {container} {} + + /// Posts message to queue. This is marked `const` to pass to lambda by + /// copy. This throws away messages that are over the capacity of the + /// container. + template + bool try_post_message(Ts&&... args) const + { + if (m_container != nullptr) + { + constexpr size_type header_sz = align(sizeof(msg_header)); + constexpr size_type args_sz = align(sizeof(msg_args)); + constexpr size_type msg_sz = header_sz + args_sz; + auto local_size = m_container->m_end; + m_container->m_end += msg_sz; + if (m_container->m_data != nullptr && + local_size + msg_sz <= m_container->m_capacity) + { + char* buf = m_container->m_data + local_size; + new (buf) msg_header {args_sz, m_id, buf + header_sz}; + new (buf + header_sz) + msg_args {args_type(std::forward(args)...)}; + + // Actual size of buffer used + m_container->m_size += msg_sz; + return true; + } + } + + return false; + } + +private: + int m_id; + Container* m_container; +}; + +// TODO: turning off for now +// need to relook at logic +#if 0 +template +class queue +{ +public: + using policy = RAJA::spsc_queue_overwrite; + + using value_type = typename Container::value_type; + using size_type = typename Container::size_type; + + queue(Container& container) : m_container {&container} {} + + queue(Container* container) : m_container {container} {} + + /// Posts message to queue. This is marked `const` to pass to lambda by + /// copy. This overwrites previously stored messages once the number of + /// messages are over the capacity of the container. + template + bool try_post_message(Ts&&... args) const + { + if (m_container != nullptr) + { + auto local_size = m_container->m_size++; + if (m_container->m_data != nullptr) + { + m_container->m_data[local_size % m_container->m_capacity] = + value_type(std::forward(args)...); + return true; + } + } + + return false; + } + +private: + Container* m_container; +}; +#endif + +} // namespace messages +} // namespace RAJA + +#endif // closing endif for header file include guard diff --git a/include/RAJA/util/messages.hpp b/include/RAJA/util/messages.hpp new file mode 100644 index 0000000000..9ff3952f56 --- /dev/null +++ b/include/RAJA/util/messages.hpp @@ -0,0 +1,332 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief RAJA header file defining a GPU to CPU message handler class. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#ifndef RAJA_MESSAGES_HPP +#define RAJA_MESSAGES_HPP + +#include +#include +#include + +#include "RAJA/util/msg_header.hpp" +#include "RAJA/policy/msg_queue.hpp" + +#include "camp/resource.hpp" + +namespace RAJA +{ +/// +/// Owning wrapper for a message queue. This is used for ownership +/// of the message queue and is a move-only class. For getting a view-like +/// class, use the `get_queue` member function, which allows copying. +/// +template +class message_bus; + +/// +/// Specialized case from message bus. +/// This will store a msg_header and arguments in a char* buffer. These +/// are later reinterpretted to the correct message arguments. +/// +template<> +class message_bus +{ +private: + // Internal classes + struct queue + { + using value_type = char; + using size_type = unsigned long long; + using pointer = value_type*; + using const_pointer = const value_type*; + using iterator = value_type*; + using const_iterator = const value_type*; + + size_type m_begin {0}; + size_type m_end {0}; + size_type m_capacity {0}; + size_type m_size {0}; + pointer m_data {nullptr}; + }; + + struct msg_iterator + { + using value_type = char; + using pointer = value_type*; + using reference = value_type&; + using difference_type = std::ptrdiff_t; + using iterator_categroy = std::forward_iterator_tag; + + msg_iterator(pointer ptr) : cur_ptr(ptr) {} + + msg_header& operator*() const + { + return *std::launder(reinterpret_cast(cur_ptr)); + } + + msg_header* operator->() const + { + return std::launder(reinterpret_cast(cur_ptr)); + } + + msg_iterator& operator++() + { + msg_header& msg = *std::launder(reinterpret_cast(cur_ptr)); + cur_ptr += msg.sz + align(sizeof(msg_header)); + + return (*this); + } + + msg_iterator operator++(int) + { + msg_iterator temp = *this; + ++(*this); + return temp; + } + + bool operator==(const msg_iterator& other) const + { + return (cur_ptr == other.cur_ptr); + } + + bool operator!=(const msg_iterator& other) const + { + return !(*this == other); + } + + private: + pointer cur_ptr; + }; + + struct resource_deleter + { + public: + using resource_type = camp::resources::Resource; + + template + resource_deleter(Resource res) : m_res {res} + {} + + void operator()(queue* ptr) + { + m_res.wait(); + ptr->~queue(); + m_res.deallocate(ptr, camp::resources::MemoryAccess::Pinned); + } + + private: + resource_type m_res; + }; + +public: + using value_type = char; + using size_type = unsigned long long; + using pointer = value_type*; + using const_pointer = const value_type*; + using iterator = msg_iterator; + using const_iterator = const iterator; + using resource_type = resource_deleter::resource_type; + + message_bus() + : m_res {camp::resources::Host()}, + m_bus {new (m_res.allocate( + 1, + camp::resources::MemoryAccess::Pinned)) queue {}, + resource_deleter {m_res}} + {} + + template + message_bus(Resource res) + : m_res {res}, + m_bus {new (m_res.allocate( + 1, + camp::resources::MemoryAccess::Pinned)) queue {}, + resource_deleter {m_res}} + {} + + template + message_bus(const size_type bus_sz, Resource res) : message_bus {res} + { + reserve(bus_sz); + } + + ~message_bus() { reset(); } + + // Copy ctor/operator + message_bus(const message_bus&) = delete; + message_bus& operator=(const message_bus&) = delete; + + // Move ctor/operator + message_bus(message_bus&&) = default; + message_bus& operator=(message_bus&&) = default; + + void reserve(size_type bus_sz) + { + reset(); + m_bus->m_data = m_res.allocate( + bus_sz, camp::resources::MemoryAccess::Pinned); + m_bus->m_capacity = bus_sz; + } + + void reset() + { + // Verify that queue is not in use + if (m_bus->m_data != nullptr) + { + m_res.wait(); + m_res.deallocate(m_bus->m_data, camp::resources::MemoryAccess::Pinned); + m_bus->m_data = nullptr; + } + m_bus->m_capacity = 0; + m_bus->m_size = 0; + m_bus->m_end = 0; + m_bus->m_begin = 0; + } + + bool has_pending_messages() { return get_num_pending_messages() != 0; } + + size_type get_num_pending_messages() + { + m_res.wait(); + return m_bus->m_size; + } + + void clear_messages() + { + m_res.wait(); + m_bus->m_size = 0; + m_bus->m_end = 0; + m_bus->m_begin = 0; + } + + template + auto get_queue(int id) noexcept + { + return RAJA::messages::queue> { + id, m_bus.get()}; + } + + template + auto get_queue(int id) const noexcept + { + return RAJA::messages::queue> { + id, m_bus.get()}; + } + + iterator begin() noexcept { return iterator {m_bus->m_data}; } + + iterator end() noexcept + { + return iterator {m_bus->m_data + get_num_pending_messages()}; + } + +private: + resource_type m_res; + std::unique_ptr m_bus; +}; + +/// +/// Provides a way to handle messages from a GPU. This currently +/// stores messages from the GPU and then calls a callback +/// function from the host. +/// +/// Note: +/// Currently, this forces a synchronize prior to calling +/// the callback function or testing if there are any messages. +/// +class message_manager +{ +public: + using callback_type = std::function; + using msg_id = int; + using msg_bus = message_bus; + + template + using msg_decay_t = std::decay_t; + +public: + template + message_manager(const std::size_t bus_sz, Resource res) : m_bus {bus_sz, res} + {} + + ~message_manager() = default; + + // Doesn't support copying + message_manager(const message_manager&) = delete; + message_manager& operator=(const message_manager&) = delete; + + // Move ctor/operator + message_manager(message_manager&&) = default; + message_manager& operator=(message_manager&&) = default; + + template + auto get_queue(msg_id id, Callable&& c) + { + return get_queue_impl(id, std::forward(c), + std::function {std::forward(c)}); + } + + void clear() { m_bus.clear_messages(); } + + bool test_any() { return m_bus.has_pending_messages(); } + + void wait_all() + { + if (test_any()) + { + for (auto& msg : m_bus) + { + m_callbacks[msg.id](msg.args); + } + clear(); + } + } + +private: + // TODO: create small wrapper for callables + template + auto get_queue_impl(msg_id id, Callable&& c, std::function) + { + m_callbacks[id] = callback_type {[=](char* msg_args_buf) { + msg_args...>& aligned_args = *std::launder( + reinterpret_cast...>*>(msg_args_buf)); + camp::apply(c, aligned_args); + aligned_args.~msg_args...>(); + }}; + return m_bus.template get_queue...>(id); + } + + msg_bus m_bus; + std::map m_callbacks; +}; + +template +auto make_message_manager(std::size_t bus_sz, Resource r) +{ + return RAJA::message_manager(bus_sz, r); +} + +template +auto make_message_manager(std::size_t bus_sz) +{ + auto r = RAJA::resources::get_default_resource(); + return RAJA::message_manager(bus_sz, r); +} + +} // namespace RAJA + +#endif /* RAJA_MESSAGES_HPP */ diff --git a/include/RAJA/util/msg_header.hpp b/include/RAJA/util/msg_header.hpp new file mode 100644 index 0000000000..63ab64cf6c --- /dev/null +++ b/include/RAJA/util/msg_header.hpp @@ -0,0 +1,49 @@ +/*! + ****************************************************************************** + * + * \file + * + * \brief RAJA header file defining additional helpers for RAJA::messages + * to be properly aligned. + * + ****************************************************************************** + */ + +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +// and RAJA project contributors. See the RAJA/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +#ifndef RAJA_MSG_ALIGN_HPP +#define RAJA_MSG_ALIGN_HPP + +#include +#include "camp/tuple.hpp" + +namespace RAJA +{ +RAJA_HOST_DEVICE +constexpr std::size_t align(std::size_t size, std::size_t alignment = 16) +/** Returns the aligned size. This would use `alignof(std::max_align_t)`; + * however, the device side can get a different value compared to the + * host. + * + * @return The size with proper alignment + */ +{ + return ((size + alignment - 1) / alignment) * alignment; +} + +struct msg_header +{ + std::size_t sz; + int id; + char* args; +}; + +template +using msg_args = camp::tuple; +} // namespace RAJA + +#endif // RAJA_MSG_ALIGN_HPP diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 1ef71ebbd7..2f75e4a2e5 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -16,3 +16,4 @@ add_subdirectory(view-layout) add_subdirectory(algorithm) add_subdirectory(workgroup) add_subdirectory(indexing) +add_subdirectory(messages) diff --git a/test/unit/messages/CMakeLists.txt b/test/unit/messages/CMakeLists.txt new file mode 100644 index 0000000000..a16e68735a --- /dev/null +++ b/test/unit/messages/CMakeLists.txt @@ -0,0 +1,11 @@ +############################################################################### +# Copyright (c) 2016-25, Lawrence Livermore National Security, LLC +# and RAJA project contributors. See the RAJA/LICENSE file for details. +# +# SPDX-License-Identifier: (BSD-3-Clause) +############################################################################### + +raja_add_test( + NAME test-messages + SOURCES test-messages.cpp) + diff --git a/test/unit/messages/test-messages.cpp b/test/unit/messages/test-messages.cpp new file mode 100644 index 0000000000..04f288f075 --- /dev/null +++ b/test/unit/messages/test-messages.cpp @@ -0,0 +1,169 @@ +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// +// Copyright (c) 2018-25, Lawrence Livermore National Security, LLC +// and Camp project contributors. See the camp/LICENSE file for details. +// +// SPDX-License-Identifier: (BSD-3-Clause) +//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~// + +#include "camp/array.hpp" +#include "RAJA_test-base.hpp" + +#include "gtest/gtest.h" + +TEST(message_handler, initialize) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(msg_manager.test_any(), false); + ASSERT_EQ(test, 0); +} + +TEST(message_handler, initialize_with_resource) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz, camp::resources::Host()); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(msg_manager.test_any(), false); + ASSERT_EQ(test, 0); +} + +TEST(message_handler, clear) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(q.try_post_message(5), true); + + msg_manager.clear(); + msg_manager.wait_all(); + + ASSERT_EQ(test, 0); +} + +TEST(message_handler, try_post_message) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(test, 0); +} + +TEST(message_handler, try_post_message_overflow) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(q.try_post_message(5), true); + ASSERT_EQ(q.try_post_message(7), false); + + ASSERT_EQ(test, 0); +} + +TEST(message_handler, try_post_message_overwrite) { + // TODO: implement +} + +TEST(message_handler, wait_all) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(q.try_post_message(1), true); + + msg_manager.wait_all(); + + ASSERT_EQ(test, 1); +} + +TEST(message_handler, wait_all_overalloc) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(2*msg_sz); + + int test = 0; + auto q = msg_manager.get_queue(msg_id, [&](int val) { + test = val; + }); + + ASSERT_EQ(q.try_post_message(1), true); + + msg_manager.wait_all(); + + ASSERT_EQ(test, 1); +} + +TEST(message_handler, wait_all_array) { + constexpr std::size_t msg_sz = RAJA::align(sizeof(RAJA::msg_header)) + + RAJA::align(sizeof(RAJA::msg_args>)); + constexpr int msg_id = 0; + + auto msg_manager = RAJA::make_message_manager(msg_sz); + + camp::array test = {0, 0, 0}; + auto q = msg_manager.get_queue(1, + [&](camp::array val) { + test[0] = val[0]; + test[1] = val[1]; + test[2] = val[2]; + } + ); + + camp::array a{1,2,3}; + ASSERT_EQ(q.try_post_message(a), true); + + msg_manager.wait_all(); + + ASSERT_EQ(test[0], 1); + ASSERT_EQ(test[1], 2); + ASSERT_EQ(test[2], 3); +} + +TEST(message_handler, wait_all_overflow) { + // TODO: implement +} +