2222#include < cudf/column/column.hpp>
2323#include < cudf/column/column_factories.hpp>
2424#include < cudf/column/column_view.hpp>
25- #include < cudf/copying.hpp>
2625#include < cudf/detail/aggregation/aggregation.cuh>
2726#include < cudf/detail/aggregation/aggregation.hpp>
2827#include < cudf/detail/aggregation/result_cache.hpp>
2928#include < cudf/detail/binaryop.hpp>
29+ #include < cudf/detail/cuco_helpers.hpp>
3030#include < cudf/detail/gather.hpp>
3131#include < cudf/detail/groupby.hpp>
3232#include < cudf/detail/null_mask.hpp>
33- #include < cudf/detail/replace.hpp>
3433#include < cudf/detail/unary.hpp>
35- #include < cudf/detail/utilities/algorithm.cuh>
36- #include < cudf/detail/utilities/cuda.cuh>
3734#include < cudf/detail/utilities/vector_factories.hpp>
3835#include < cudf/dictionary/dictionary_column_view.hpp>
3936#include < cudf/groupby.hpp>
4037#include < cudf/hashing/detail/default_hash.cuh>
41- #include < cudf/scalar/scalar.hpp>
4238#include < cudf/table/experimental/row_operators.cuh>
4339#include < cudf/table/table.hpp>
4440#include < cudf/table/table_device_view.cuh>
4945
5046#include < rmm/cuda_stream_view.hpp>
5147
52- #include < cuda/functional>
53- #include < cuda/std/atomic>
54- #include < thrust/copy.h>
48+ #include < cuco/static_set.cuh>
5549#include < thrust/for_each.h>
5650#include < thrust/iterator/counting_iterator.h>
57- #include < thrust/iterator/transform_iterator.h>
5851
5952#include < memory>
6053#include < unordered_set>
@@ -66,15 +59,12 @@ namespace detail {
6659namespace hash {
6760namespace {
6861
69- // TODO: replace it with `cuco::static_map`
70- // https://github.com/rapidsai/cudf/issues/10401
71- template <typename ComparatorType>
72- using map_type = concurrent_unordered_map<
73- cudf::size_type,
74- cudf::size_type,
62+ // TODO: similar to `contains_table`, using larger CG size like 2 or 4 for nested
63+ // types and `cg_size = 1`for flat data to improve performance
64+ using probing_scheme_type = cuco::linear_probing<
65+ 1 , // /< Number of threads used to handle each input key
7566 cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
76- cudf::nullate::DYNAMIC>,
77- ComparatorType>;
67+ cudf::nullate::DYNAMIC>>;
7868
7969/* *
8070 * @brief List of aggregation operations that can be computed with a hash-based
@@ -190,14 +180,14 @@ class groupby_simple_aggregations_collector final
190180 }
191181};
192182
193- template <typename ComparatorType >
183+ template <typename SetType >
194184class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
195185 column_view col;
196186 data_type result_type;
197187 cudf::detail::result_cache* sparse_results;
198188 cudf::detail::result_cache* dense_results;
199189 device_span<size_type const > gather_map;
200- map_type<ComparatorType> const & map ;
190+ SetType set ;
201191 bitmask_type const * __restrict__ row_bitmask;
202192 rmm::cuda_stream_view stream;
203193 rmm::mr::device_memory_resource* mr;
@@ -209,15 +199,15 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
209199 cudf::detail::result_cache* sparse_results,
210200 cudf::detail::result_cache* dense_results,
211201 device_span<size_type const > gather_map,
212- map_type<ComparatorType> const & map ,
202+ SetType set ,
213203 bitmask_type const * row_bitmask,
214204 rmm::cuda_stream_view stream,
215205 rmm::mr::device_memory_resource* mr)
216206 : col(col),
217207 sparse_results (sparse_results),
218208 dense_results(dense_results),
219209 gather_map(gather_map),
220- map(map ),
210+ set(set ),
221211 row_bitmask(row_bitmask),
222212 stream(stream),
223213 mr(mr)
@@ -340,8 +330,8 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
340330 rmm::exec_policy (stream),
341331 thrust::make_counting_iterator (0 ),
342332 col.size (),
343- ::cudf::detail::var_hash_functor<map_type<ComparatorType>> {
344- map , row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof });
333+ ::cudf::detail::var_hash_functor{
334+ set , row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof });
345335 sparse_results->add_result (col, agg, std::move (var_result));
346336 dense_results->add_result (col, agg, to_dense_agg_result (agg));
347337 }
@@ -398,13 +388,13 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
398388 *
399389 * @see groupby_null_templated()
400390 */
401- template <typename ComparatorType >
391+ template <typename SetType >
402392void sparse_to_dense_results (table_view const & keys,
403393 host_span<aggregation_request const > requests,
404394 cudf::detail::result_cache* sparse_results,
405395 cudf::detail::result_cache* dense_results,
406396 device_span<size_type const > gather_map,
407- map_type<ComparatorType> const & map ,
397+ SetType set ,
408398 bool keys_have_nulls,
409399 null_policy include_null_keys,
410400 rmm::cuda_stream_view stream,
@@ -423,7 +413,7 @@ void sparse_to_dense_results(table_view const& keys,
423413 // Given an aggregation, this will get the result from sparse_results and
424414 // convert and return dense, compacted result
425415 auto finalizer = hash_compound_agg_finalizer (
426- col, sparse_results, dense_results, gather_map, map , row_bitmask_ptr, stream, mr);
416+ col, sparse_results, dense_results, gather_map, set , row_bitmask_ptr, stream, mr);
427417 for (auto && agg : agg_v) {
428418 agg->finalize (finalizer);
429419 }
@@ -467,11 +457,11 @@ auto create_sparse_results_table(table_view const& flattened_values,
467457 * @brief Computes all aggregations from `requests` that require a single pass
468458 * over the data and stores the results in `sparse_results`
469459 */
470- template <typename ComparatorType >
460+ template <typename SetType >
471461void compute_single_pass_aggs (table_view const & keys,
472462 host_span<aggregation_request const > requests,
473463 cudf::detail::result_cache* sparse_results,
474- map_type<ComparatorType>& map ,
464+ SetType set ,
475465 bool keys_have_nulls,
476466 null_policy include_null_keys,
477467 rmm::cuda_stream_view stream)
@@ -494,16 +484,16 @@ void compute_single_pass_aggs(table_view const& keys,
494484 ? cudf::detail::bitmask_and (keys, stream, rmm::mr::get_current_device_resource ()).first
495485 : rmm::device_buffer{};
496486
497- thrust::for_each_n (rmm::exec_policy (stream),
498- thrust::make_counting_iterator ( 0 ),
499- keys. num_rows ( ),
500- hash::compute_single_pass_aggs_fn<map_type<ComparatorType>>{
501- map ,
502- *d_values,
503- *d_sparse_table,
504- d_aggs.data (),
505- static_cast <bitmask_type*>(row_bitmask.data ()),
506- skip_key_rows_with_nulls});
487+ thrust::for_each_n (
488+ rmm::exec_policy (stream ),
489+ thrust::make_counting_iterator ( 0 ),
490+ keys. num_rows (),
491+ hash::compute_single_pass_aggs_fn{set ,
492+ *d_values,
493+ *d_sparse_table,
494+ d_aggs.data (),
495+ static_cast <bitmask_type*>(row_bitmask.data ()),
496+ skip_key_rows_with_nulls});
507497 // Add results back to sparse_results cache
508498 auto sparse_result_cols = sparse_table.release ();
509499 for (size_t i = 0 ; i < aggs.size (); i++) {
@@ -517,23 +507,15 @@ void compute_single_pass_aggs(table_view const& keys,
517507 * @brief Computes and returns a device vector containing all populated keys in
518508 * `map`.
519509 */
520- template <typename ComparatorType >
521- rmm::device_uvector<size_type> extract_populated_keys (map_type<ComparatorType> const & map ,
510+ template <typename SetType >
511+ rmm::device_uvector<size_type> extract_populated_keys (SetType const & key_set ,
522512 size_type num_keys,
523513 rmm::cuda_stream_view stream)
524514{
525515 rmm::device_uvector<size_type> populated_keys (num_keys, stream);
516+ auto const keys_end = key_set.retrieve_all (populated_keys.begin (), stream.value ());
526517
527- auto const get_key = cuda::proclaim_return_type<typename map_type<ComparatorType>::key_type>(
528- [] __device__ (auto const & element) { return element.first ; }); // first = key
529- auto const key_used = [unused = map.get_unused_key ()] __device__ (auto key) {
530- return key != unused;
531- };
532- auto const key_itr = thrust::make_transform_iterator (map.data (), get_key);
533- auto const end_it = cudf::detail::copy_if_safe (
534- key_itr, key_itr + map.capacity (), populated_keys.begin (), key_used, stream);
535-
536- populated_keys.resize (std::distance (populated_keys.begin (), end_it), stream);
518+ populated_keys.resize (std::distance (populated_keys.begin (), keys_end), stream);
537519 return populated_keys;
538520}
539521
@@ -580,38 +562,41 @@ std::unique_ptr<table> groupby(table_view const& keys,
580562 auto const row_hash = cudf::experimental::row::hash::row_hasher{std::move (preprocessed_keys)};
581563 auto const d_row_hash = row_hash.device_hasher (has_null);
582564
583- size_type constexpr unused_key{std::numeric_limits<size_type>::max ()};
584- size_type constexpr unused_value{std::numeric_limits<size_type>::max ()};
585-
586565 // Cache of sparse results where the location of aggregate value in each
587- // column is indexed by the hash map
566+ // column is indexed by the hash set
588567 cudf::detail::result_cache sparse_results (requests.size ());
589568
590569 auto const comparator_helper = [&](auto const d_key_equal) {
591- using allocator_type = typename map_type<decltype (d_key_equal)>::allocator_type;
592-
593- auto const map = map_type<decltype (d_key_equal)>::create (compute_hash_table_size (num_keys),
594- stream,
595- unused_key,
596- unused_value,
597- d_row_hash,
598- d_key_equal,
599- allocator_type ());
600- // Compute all single pass aggs first
601- compute_single_pass_aggs (
602- keys, requests, &sparse_results, *map, keys_have_nulls, include_null_keys, stream);
570+ auto const set = cuco::static_set{num_keys,
571+ 0.5 , // desired load factor
572+ cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
573+ d_key_equal,
574+ probing_scheme_type{d_row_hash},
575+ cuco::thread_scope_device,
576+ cuco::storage<1 >{},
577+ cudf::detail::cuco_allocator{stream},
578+ stream.value ()};
603579
604- // Extract the populated indices from the hash map and create a gather map.
580+ // Compute all single pass aggs first
581+ compute_single_pass_aggs (keys,
582+ requests,
583+ &sparse_results,
584+ set.ref (cuco::insert_and_find),
585+ keys_have_nulls,
586+ include_null_keys,
587+ stream);
588+
589+ // Extract the populated indices from the hash set and create a gather map.
605590 // Gathering using this map from sparse results will give dense results.
606- auto gather_map = extract_populated_keys (*map , keys.num_rows (), stream);
591+ auto gather_map = extract_populated_keys (set , keys.num_rows (), stream);
607592
608593 // Compact all results from sparse_results and insert into cache
609594 sparse_to_dense_results (keys,
610595 requests,
611596 &sparse_results,
612597 cache,
613598 gather_map,
614- *map ,
599+ set. ref (cuco::find) ,
615600 keys_have_nulls,
616601 include_null_keys,
617602 stream,
0 commit comments