diff --git a/cpp/src/umap/simpl_set_embed/algo.cuh b/cpp/src/umap/simpl_set_embed/algo.cuh index e3c3dab978..8a0e4f540d 100644 --- a/cpp/src/umap/simpl_set_embed/algo.cuh +++ b/cpp/src/umap/simpl_set_embed/algo.cuh @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -91,48 +91,6 @@ void optimization_iteration_finalization( seed += 1; } -/** - * Update the embeddings and clear the buffers when using deterministic algorithm. - */ -template -void apply_embedding_updates(T* head_embedding, - T* head_buffer, - int head_n, - T* tail_embedding, - T* tail_buffer, - int tail_n, - UMAPParams* params, - bool move_other, - rmm::cuda_stream_view stream) -{ - ASSERT(params->deterministic, "Only used when deterministic is set to true."); - nnz_t n_components = params->n_components; - if (move_other) { - thrust::for_each(rmm::exec_policy(stream), - thrust::make_counting_iterator(0u), - thrust::make_counting_iterator(0u) + std::max(head_n, tail_n) * n_components, - [=] __device__(uint32_t i) { - if (i < head_n * n_components) { - head_embedding[i] += head_buffer[i]; - head_buffer[i] = 0.0f; - } - if (i < tail_n * n_components) { - tail_embedding[i] += tail_buffer[i]; - tail_buffer[i] = 0.0f; - } - }); - } else { - // No need to update reference embedding - thrust::for_each(rmm::exec_policy(stream), - thrust::make_counting_iterator(0u), - thrust::make_counting_iterator(0u) + head_n * n_components, - [=] __device__(uint32_t i) { - head_embedding[i] += head_buffer[i]; - head_buffer[i] = 0.0f; - }); - } -} - /** * \brief Constructs a rounding factor used to truncate elements in a sum such that the * sum of the truncated elements is the same no matter what the order of the sum is. @@ -268,13 +226,15 @@ void optimize_layout(T* head_embedding, has_outlier = check_outliers(tail, tail_n, nnz, threshold_for_outlier, stream); } - if (has_outlier) { + if (has_outlier || params->deterministic) { // Shuffling is necessary when outliers may be present (i.e., dense points that undergo many // updates). It is critical to avoid having too many threads update the same embedding vector // simultaneously, as this can affect correctness. By shuffling, potential outlier points are // distributed across threads, rather than being processed by consecutive threads that are // scheduled together. This approach relies on the GPU's inability to physically schedule all // nnz edges at once. + // also shuffle when want deterministic behavior to ensure that updates for the same vertex are + // processed in different kernel launches. auto first = thrust::make_zip_iterator(thrust::make_tuple(thrust::device_pointer_cast(head), thrust::device_pointer_cast(tail), @@ -292,6 +252,20 @@ void optimize_layout(T* head_embedding, } } + if (has_outlier && params->deterministic) { + // for processing in deterministic mode on datasets that are likely to have outliers, we use the + // heuristic below to determine the number of chunks. + // Empirically determined: 100000 edges per chunk provides good balance between determinism and + // avoiding gradient accumulation issues on large datasets. See benchmarks in + // https://github.com/rapidsai/cuml/pull/7597 + if (nnz > 100000) { + num_chunks = raft::ceildiv(nnz, static_cast(100000)); + } else if (nnz > 10000) { + // more fine-grained chunks for smaller number of edges + num_chunks = raft::ceildiv(nnz, static_cast(10000)); + } + } + rmm::device_uvector epoch_of_next_negative_sample(nnz, stream); T nsr_inv = T(1.0) / params->negative_sample_rate; raft::linalg::unaryOp( @@ -307,22 +281,39 @@ void optimize_layout(T* head_embedding, // Buffers used to store the gradient updates to avoid conflicts rmm::device_uvector head_buffer(0, stream_view); rmm::device_uvector tail_buffer(0, stream_view); + + // Flags to track which vertices were modified per chunk (for sparse apply) + rmm::device_uvector head_flags(0, stream_view); + rmm::device_uvector tail_flags(0, stream_view); // Write to embedding directly if deterministic is not needed. - T* d_head_buffer = head_embedding; - T* d_tail_buffer = tail_embedding; + T* d_head_buffer = head_embedding; + T* d_tail_buffer = tail_embedding; + uint32_t* d_head_flags = nullptr; + uint32_t* d_tail_flags = nullptr; if (params->deterministic) { nnz_t n_components = params->n_components; head_buffer.resize(head_n * n_components, stream_view); RAFT_CUDA_TRY( cudaMemsetAsync(head_buffer.data(), '\0', sizeof(T) * head_buffer.size(), stream)); + + int head_flag_words = raft::ceildiv(head_n, 32); + head_flags.resize(head_flag_words, stream_view); + RAFT_CUDA_TRY( + cudaMemsetAsync(head_flags.data(), '\0', sizeof(uint32_t) * head_flag_words, stream)); + d_head_buffer = head_buffer.data(); + d_head_flags = head_flags.data(); // No need for tail if it's not being written. if (move_other) { tail_buffer.resize(tail_n * n_components, stream_view); RAFT_CUDA_TRY( cudaMemsetAsync(tail_buffer.data(), '\0', sizeof(T) * tail_buffer.size(), stream)); + int tail_flag_words = raft::ceildiv(tail_n, 32); + tail_flags.resize(tail_flag_words, stream_view); + RAFT_CUDA_TRY( + cudaMemsetAsync(tail_flags.data(), '\0', sizeof(uint32_t) * tail_flag_words, stream)); + d_tail_flags = tail_flags.data(); + d_tail_buffer = tail_buffer.data(); } - d_head_buffer = head_buffer.data(); - d_tail_buffer = tail_buffer.data(); } // we keep the tpb and change the number of blocks we launch to handle the total number of threads @@ -335,8 +326,11 @@ void optimize_layout(T* head_embedding, for (int n = 0; n < n_epochs; n++) { call_optimize_batch_kernel(head_embedding, d_head_buffer, + d_head_flags, + head_n, tail_embedding, d_tail_buffer, + d_tail_flags, tail_n, head, tail, @@ -354,17 +348,6 @@ void optimize_layout(T* head_embedding, blk, stream, rounding); - if (params->deterministic) { - apply_embedding_updates(head_embedding, - d_head_buffer, - head_n, - tail_embedding, - d_tail_buffer, - tail_n, - params, - move_other, - stream_view); - } RAFT_CUDA_TRY(cudaGetLastError()); optimization_iteration_finalization(params, head_embedding, alpha, n, n_epochs, seed); } diff --git a/cpp/src/umap/simpl_set_embed/optimize_batch_kernel.cuh b/cpp/src/umap/simpl_set_embed/optimize_batch_kernel.cuh index 33abacf588..9940914d31 100644 --- a/cpp/src/umap/simpl_set_embed/optimize_batch_kernel.cuh +++ b/cpp/src/umap/simpl_set_embed/optimize_batch_kernel.cuh @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -13,6 +13,8 @@ #include #include +#include + #include #include @@ -23,6 +25,87 @@ namespace Algo { using namespace ML; +/** + * Set a bit in the flags to mark a vertex as modified. + */ +__device__ __forceinline__ void set_vertex_modified(uint32_t* flags, int vertex_id) +{ + int word_idx = vertex_id >> 5; // vertex_id / 32 + int bit_idx = vertex_id & 31; // vertex_id % 32 + atomicOr(&flags[word_idx], 1u << bit_idx); +} + +/** + * Sparse apply kernel: only updates vertices that have their bit set in the flags. + */ +template +CUML_KERNEL void sparse_apply_kernel( + T* embedding, T* buffer, uint32_t* flags, int n_vertices, int n_components) +{ + int tid = blockIdx.x * TPB_X + threadIdx.x; + int total_threads = gridDim.x * TPB_X; + + int n_words = (n_vertices + 31) >> 5; + + // each thread strides through the flags in word units + for (int word_idx = tid; word_idx < n_words; word_idx += total_threads) { + uint32_t word = flags[word_idx]; + if (word == 0) continue; // skip if no bits set + + while (word != 0) { + int bit_pos = __ffs(word) - 1; // find first set bit (1-indexed, so -1) + int vertex_id = (word_idx << 5) + bit_pos; // word_idx * 32 (word size) + bit_pos + + if (vertex_id < n_vertices) { + // Apply update for this vertex + int base = vertex_id * n_components; + for (int d = 0; d < n_components; d++) { + embedding[base + d] += buffer[base + d]; + buffer[base + d] = T(0.0); + } + } + + // Clear the first set bit just processed + word &= ~(1u << bit_pos); + } + + flags[word_idx] = 0; // clear flag for next chunk + } +} + +/** + * Update the embeddings using sparse apply using bit flags to track which vertices received + * gradient updates. + */ +template +void sparse_apply_embedding_updates(T* head_embedding, + T* head_buffer, + uint32_t* head_flags, + int head_n, + T* tail_embedding, + T* tail_buffer, + uint32_t* tail_flags, + int tail_n, + UMAPParams const* params, + bool move_other, + cudaStream_t stream) +{ + // flags: one thread per 32-vertex word + int head_words = raft::ceildiv(head_n, 32); + dim3 grid_head(raft::ceildiv(head_words, TPB_X), 1, 1); + dim3 blk(TPB_X, 1, 1); + + sparse_apply_kernel<<>>( + head_embedding, head_buffer, head_flags, head_n, params->n_components); + + if (move_other) { + int tail_words = raft::ceildiv(tail_n, 32); + dim3 grid_tail(raft::ceildiv(tail_words, TPB_X), 1, 1); + sparse_apply_kernel<<>>( + tail_embedding, tail_buffer, tail_flags, tail_n, params->n_components); + } +} + /** * Calculate the squared distance between two vectors of size n * @{ @@ -90,8 +173,10 @@ DI T truncate_gradient(T const rounding_factor, T const x) template CUML_KERNEL void optimize_batch_kernel_reg(T const* head_embedding, T* head_buffer, + uint32_t* head_flags, T const* tail_embedding, T* tail_buffer, + uint32_t* tail_flags, MLCommon::FastIntDiv tail_n, const int* head, const int* tail, @@ -106,17 +191,28 @@ CUML_KERNEL void optimize_batch_kernel_reg(T const* head_embedding, bool move_other, UMAPParams params, T nsr_inv, - T rounding) + T rounding, + size_t offset = 0) { - size_t row = (static_cast(blockIdx.x) * static_cast(TPB_X)) + threadIdx.x; + size_t row = + (static_cast(blockIdx.x) * static_cast(TPB_X)) + threadIdx.x + offset; size_t skip_size = static_cast(blockDim.x) * gridDim.x; T current_reg[n_components], other_reg[n_components], grads[n_components]; + + // Deterministic mode: process exactly one row per thread (no grid-stride loop) + // Non-deterministic mode: use grid-stride loop to process multiple rows while (row < nnz) { auto _epoch_of_next_sample = epoch_of_next_sample[row]; if (_epoch_of_next_sample > epoch) { - row += skip_size; - continue; + if (params.deterministic) { + // we return immediately in deterministic mode instead of continuing the grid-stride loop + // because we launch a new kernel for the next chunk + return; + } else { + row += skip_size; + continue; + } } auto _epochs_per_sample = epochs_per_sample[row]; auto epochs_per_negative_sample = _epochs_per_sample * nsr_inv; @@ -158,6 +254,7 @@ CUML_KERNEL void optimize_batch_kernel_reg(T const* head_embedding, for (int d = 0; d < n_components; d++) { raft::myAtomicAdd(oth_write + d, truncate_gradient(rounding, -grads[d])); } + if (tail_flags != nullptr) { set_vertex_modified(tail_flags, k); } } epoch_of_next_sample[row] = _epoch_of_next_sample + _epochs_per_sample; // number of negative samples to choose @@ -203,17 +300,26 @@ CUML_KERNEL void optimize_batch_kernel_reg(T const* head_embedding, for (int d = 0; d < n_components; d++) { raft::myAtomicAdd(cur_write + d, truncate_gradient(rounding, grads[d])); } + if (head_flags != nullptr) { set_vertex_modified(head_flags, j); } + epoch_of_next_negative_sample[row] = _epoch_of_next_negative_sample + n_neg_samples * epochs_per_negative_sample; - row += skip_size; + + if (params.deterministic) { + return; // Only process one row in deterministic mode + } else { + row += skip_size; + } } } template CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, T* head_buffer, + uint32_t* head_flags, T const* tail_embedding, T* tail_buffer, + uint32_t* tail_flags, MLCommon::FastIntDiv tail_n, const int* head, const int* tail, @@ -228,17 +334,25 @@ CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, bool move_other, UMAPParams params, T nsr_inv, - T rounding) + T rounding, + size_t offset = 0) { extern __shared__ T embedding_shared_mem_updates[]; - size_t row = (static_cast(blockIdx.x) * static_cast(TPB_X)) + threadIdx.x; + size_t row = + (static_cast(blockIdx.x) * static_cast(TPB_X)) + threadIdx.x + offset; size_t skip_size = static_cast(blockDim.x) * gridDim.x; while (row < nnz) { auto _epoch_of_next_sample = epoch_of_next_sample[row]; if (_epoch_of_next_sample > epoch) { - row += skip_size; - continue; + if (params.deterministic) { + // we return immediately in deterministic mode instead of continuing the grid-stride loop + // because we launch a new kernel for the next chunk + return; + } else { + row += skip_size; + continue; + } } auto _epochs_per_sample = epochs_per_sample[row]; auto epochs_per_negative_sample = _epochs_per_sample * nsr_inv; @@ -294,12 +408,18 @@ CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, } } } + + if constexpr (!use_shared_mem) { + if (head_flags != nullptr) { set_vertex_modified(head_flags, j); } + if (move_other && tail_flags != nullptr) { set_vertex_modified(tail_flags, k); } + } // storing gradients for negative samples back to global memory if (use_shared_mem && move_other) { for (int d = 0; d < n_components; d++) { auto grad = grads_buffer[d * TPB_X]; raft::myAtomicAdd((T*)oth_write + d, truncate_gradient(rounding, -grad)); } + if (tail_flags != nullptr) { set_vertex_modified(tail_flags, k); } } epoch_of_next_sample[row] = _epoch_of_next_sample + _epochs_per_sample; // number of negative samples to choose @@ -358,10 +478,15 @@ CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, raft::myAtomicAdd((T*)cur_write + d, truncate_gradient(rounding, grads_buffer[d * TPB_X])); } + if (head_flags != nullptr) { set_vertex_modified(head_flags, j); } } epoch_of_next_negative_sample[row] = _epoch_of_next_negative_sample + n_neg_samples * epochs_per_negative_sample; - row += skip_size; + if (params.deterministic) { + return; // Only process one row in deterministic mode + } else { + row += skip_size; + } } } @@ -370,6 +495,8 @@ CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, * result is required. They are the same pointer if random seed is not * provided. * @param tail_buffer: Similar to head_buffer, but for tail_embedding. + * @param head_flags: flags tracking which head vertices were modified (for sparse apply). + * @param tail_flags: flags tracking which tail vertices were modified (for sparse apply). * @param head: Row index in COO connectivity graph. * @param tail: Column index in COO connectivity graph. * @param alpha: Learning rate @@ -378,10 +505,13 @@ CUML_KERNEL void optimize_batch_kernel(T const* head_embedding, * deterministic result. */ template -void call_optimize_batch_kernel(T const* head_embedding, +void call_optimize_batch_kernel(T* head_embedding, T* head_buffer, - T const* tail_embedding, + uint32_t* head_flags, + int head_n, + T* tail_embedding, T* tail_buffer, + uint32_t* tail_flags, int tail_n, const int* head, const int* tail, @@ -404,72 +534,116 @@ void call_optimize_batch_kernel(T const* head_embedding, requiredSize *= sizeof(T); bool use_shared_mem = requiredSize < static_cast(raft::getSharedMemPerBlock()); T nsr_inv = T(1.0) / params->negative_sample_rate; - if (params->n_components == 2) { - // multicore implementation with registers - optimize_batch_kernel_reg - <<>>(head_embedding, - head_buffer, - tail_embedding, - tail_buffer, - tail_n, - head, - tail, - nnz, - epochs_per_sample, - epoch_of_next_negative_sample, - epoch_of_next_sample, - alpha, - n, - gamma, - seed, - move_other, - *params, - nsr_inv, - rounding); - } else if (use_shared_mem) { - // multicore implementation with shared memory - optimize_batch_kernel - <<>>(head_embedding, - head_buffer, - tail_embedding, - tail_buffer, - tail_n, - head, - tail, - nnz, - epochs_per_sample, - epoch_of_next_negative_sample, - epoch_of_next_sample, - alpha, - n, - gamma, - seed, - move_other, - *params, - nsr_inv, - rounding); + + auto stream_view = rmm::cuda_stream_view(stream); + + auto launch_kernel = [&](size_t offset = 0) { + if (params->n_components == 2) { + // multicore implementation with registers + optimize_batch_kernel_reg + <<>>(head_embedding, + head_buffer, + head_flags, + tail_embedding, + tail_buffer, + tail_flags, + tail_n, + head, + tail, + nnz, + epochs_per_sample, + epoch_of_next_negative_sample, + epoch_of_next_sample, + alpha, + n, + gamma, + seed, + move_other, + *params, + nsr_inv, + rounding, + offset); + } else if (use_shared_mem) { + // multicore implementation with shared memory + optimize_batch_kernel + <<>>(head_embedding, + head_buffer, + head_flags, + tail_embedding, + tail_buffer, + tail_flags, + tail_n, + head, + tail, + nnz, + epochs_per_sample, + epoch_of_next_negative_sample, + epoch_of_next_sample, + alpha, + n, + gamma, + seed, + move_other, + *params, + nsr_inv, + rounding, + offset); + } else { + // multicore implementation without shared memory + optimize_batch_kernel + <<>>(head_embedding, + head_buffer, + head_flags, + tail_embedding, + tail_buffer, + tail_flags, + tail_n, + head, + tail, + nnz, + epochs_per_sample, + epoch_of_next_negative_sample, + epoch_of_next_sample, + alpha, + n, + gamma, + seed, + move_other, + *params, + nsr_inv, + rounding, + offset); + } + }; + + if (params->deterministic) { + // for deterministic mode, we enforce a sequential behavior using n_chunks. after 1 chunk + // (including TPB_X * grid.x threads each doing an update) is processed, we apply the gradients + // in the buffer to the resulting embedding. The next chunk is processed based on the updated + // embedding from the previous chunk. this ensures that the updates are applied in a sequential + // manner (which is crucial for avoiding outliers), and the resulting embedding is deterministic + // because we use buffers. + size_t chunk_size = static_cast(TPB_X) * static_cast(grid.x); + for (size_t offset = 0; offset < nnz; offset += chunk_size) { + launch_kernel(offset); + sparse_apply_embedding_updates(head_embedding, + head_buffer, + head_flags, + head_n, + tail_embedding, + tail_buffer, + tail_flags, + tail_n, + params, + move_other, + stream); + } } else { - // multicore implementation without shared memory - optimize_batch_kernel - <<>>(head_embedding, - head_buffer, - tail_embedding, - tail_buffer, - tail_n, - head, - tail, - nnz, - epochs_per_sample, - epoch_of_next_negative_sample, - epoch_of_next_sample, - alpha, - n, - gamma, - seed, - move_other, - *params, - nsr_inv, - rounding); + // for non-deterministic mode, we launch the kernel once with nnz/n_chunks threads. + // each thread strides through n_chunks updates and writes gradients immediately to the + // resulting embedding inside the kernel, increasing the chances of a sequential update which is + // crucial for avoiding outliers. + launch_kernel(0); } } } // namespace Algo diff --git a/python/cuml/tests/test_umap.py b/python/cuml/tests/test_umap.py index 6e804d7de1..71f0fca370 100644 --- a/python/cuml/tests/test_umap.py +++ b/python/cuml/tests/test_umap.py @@ -936,8 +936,16 @@ def test_umap_small_fit_large_transform(): @pytest.mark.parametrize("n_neighbors", [5, 15]) @pytest.mark.parametrize("n_components", [2, 5]) -def test_umap_outliers(n_neighbors, n_components): - n_rows = 50_000 +@pytest.mark.parametrize("random_state", [None, 42]) +def test_umap_outliers(n_neighbors, n_components, random_state): + if random_state is None: + n_rows = 50_000 + build_algo = "nn_descent" + init = "spectral" + else: + n_rows = 100_000 + build_algo = "brute_force_knn" + init = "random" # This dataset was specifically chosen because UMAP produces outliers # on this dataset before the outlier fix. @@ -945,10 +953,11 @@ def test_umap_outliers(n_neighbors, n_components): data = data.astype(np.float32) gpu_umap = cuUMAP( - build_algo="nn_descent", - init="spectral", + build_algo=build_algo, + init=init, n_neighbors=n_neighbors, n_components=n_components, + random_state=random_state, ) gpu_umap_embeddings = gpu_umap.fit_transform(data) @@ -956,7 +965,10 @@ def test_umap_outliers(n_neighbors, n_components): # and comparing the max and min values of the resulting embedding. However, running CPU UMAP # with this data size using spectral initialization is too slow to run repetitively in CI. # Instead, we hardwire a locally determined threshold for this dataset. - threshold = 50.0 + if random_state is None: + threshold = 50.0 + else: + threshold = 25.0 assert np.all( (gpu_umap_embeddings >= -threshold)