Skip to content

Commit c02afc9

Browse files
authored
[FEA] Add Batching to KMeans (#1886)
Merge after #1880 This PR adds support for streaming out of core (dataset on host) kmeans clustering. The idea is simple: Batched accumulation of centroid updates: Data is processed in batches and batch-wise means and cluster counts are accumulated until all the batches i.e., the full dataset pass has completed. This PR just brings a batch-size parameter to load and compute cluster assignments and (weighted) centroid adjustments on batches of the dataset. The final centroid 'updates' i.e. a single kmeans iteration only completes when all these accumulated sums are averaged once the whole dataset pass has completed. Authors: - Tarang Jain (https://github.com/tarang-jain) Approvers: - Victor Lafargue (https://github.com/viclafargue) - Anupam (https://github.com/aamijar) - Micka (https://github.com/lowener) - Jinsol Park (https://github.com/jinsolp) - Ben Frederickson (https://github.com/benfred) URL: #1886
1 parent a23bd4b commit c02afc9

12 files changed

Lines changed: 1306 additions & 157 deletions

File tree

c/include/cuvs/cluster/kmeans.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef enum {
3636
Array = 2
3737
} cuvsKMeansInitMethod;
3838

39+
3940
/**
4041
* @brief Hyper-parameters for the kmeans algorithm
4142
*/
@@ -90,6 +91,7 @@ struct cuvsKMeansParams {
9091
*/
9192
int batch_centroids;
9293

94+
/** Check inertia during iterations for early convergence. */
9395
bool inertia_check;
9496

9597
/**
@@ -101,6 +103,12 @@ struct cuvsKMeansParams {
101103
* For hierarchical k-means , defines the number of training iterations
102104
*/
103105
int hierarchical_n_iters;
106+
107+
/**
108+
* Number of samples to process per GPU batch for the batched (host-data) API.
109+
* When set to 0, defaults to n_samples (process all at once).
110+
*/
111+
int64_t streaming_batch_size;
104112
};
105113

106114
typedef struct cuvsKMeansParams* cuvsKMeansParams_t;
@@ -142,18 +150,24 @@ typedef enum { CUVS_KMEANS_TYPE_KMEANS = 0, CUVS_KMEANS_TYPE_KMEANS_BALANCED = 1
142150
* clusters are reinitialized by choosing new centroids with
143151
* k-means++ algorithm.
144152
*
153+
* X may reside on either host (CPU) or device (GPU) memory.
154+
* When X is on the host the data is streamed to the GPU in
155+
* batches controlled by params->streaming_batch_size.
156+
*
145157
* @param[in] res opaque C handle
146158
* @param[in] params Parameters for KMeans model.
147159
* @param[in] X Training instances to cluster. The data must
148-
* be in row-major format.
160+
* be in row-major format. May be on host or
161+
* device memory.
149162
* [dim = n_samples x n_features]
150163
* @param[in] sample_weight Optional weights for each observation in X.
164+
* Must be on the same memory space as X.
151165
* [len = n_samples]
152166
* @param[inout] centroids [in] When init is InitMethod::Array, use
153167
* centroids as the initial cluster centers.
154168
* [out] The generated centroids from the
155169
* kmeans algorithm are stored at the address
156-
* pointed by 'centroids'.
170+
* pointed by 'centroids'. Must be on device.
157171
* [dim = n_clusters x n_features]
158172
* @param[out] inertia Sum of squared distances of samples to their
159173
* closest cluster center.
@@ -212,6 +226,7 @@ cuvsError_t cuvsKMeansClusterCost(cuvsResources_t res,
212226
DLManagedTensor* X,
213227
DLManagedTensor* centroids,
214228
double* cost);
229+
215230
/**
216231
* @}
217232
*/

c/src/cluster/kmeans.cpp

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55

66
#include <cstdint>
7+
78
#include <dlpack/dlpack.h>
89

910
#include <cuvs/cluster/kmeans.h>
@@ -17,16 +18,18 @@ namespace {
1718

1819
cuvs::cluster::kmeans::params convert_params(const cuvsKMeansParams& params)
1920
{
20-
auto kmeans_params = cuvs::cluster::kmeans::params();
21-
kmeans_params.metric = static_cast<cuvs::distance::DistanceType>(params.metric);
22-
kmeans_params.init = static_cast<cuvs::cluster::kmeans::params::InitMethod>(params.init);
23-
kmeans_params.n_clusters = params.n_clusters;
24-
kmeans_params.max_iter = params.max_iter;
25-
kmeans_params.tol = params.tol;
21+
auto kmeans_params = cuvs::cluster::kmeans::params();
22+
kmeans_params.metric = static_cast<cuvs::distance::DistanceType>(params.metric);
23+
kmeans_params.init = static_cast<cuvs::cluster::kmeans::params::InitMethod>(params.init);
24+
kmeans_params.n_clusters = params.n_clusters;
25+
kmeans_params.max_iter = params.max_iter;
26+
kmeans_params.tol = params.tol;
27+
kmeans_params.n_init = params.n_init;
2628
kmeans_params.oversampling_factor = params.oversampling_factor;
2729
kmeans_params.batch_samples = params.batch_samples;
2830
kmeans_params.batch_centroids = params.batch_centroids;
2931
kmeans_params.inertia_check = params.inertia_check;
32+
kmeans_params.streaming_batch_size = params.streaming_batch_size;
3033
return kmeans_params;
3134
}
3235

@@ -38,7 +41,7 @@ cuvs::cluster::kmeans::balanced_params convert_balanced_params(const cuvsKMeansP
3841
return kmeans_params;
3942
}
4043

41-
template <typename T, typename IdxT = int32_t>
44+
template <typename T, typename IdxT = int64_t>
4245
void _fit(cuvsResources_t res,
4346
const cuvsKMeansParams& params,
4447
DLManagedTensor* X_tensor,
@@ -50,7 +53,51 @@ void _fit(cuvsResources_t res,
5053
auto X = X_tensor->dl_tensor;
5154
auto res_ptr = reinterpret_cast<raft::resources*>(res);
5255

53-
if (cuvs::core::is_dlpack_device_compatible(X)) {
56+
if (!cuvs::core::is_dlpack_device_compatible(X)) {
57+
auto n_samples = static_cast<IdxT>(X.shape[0]);
58+
auto n_features = static_cast<IdxT>(X.shape[1]);
59+
60+
if (params.hierarchical) {
61+
RAFT_FAIL("hierarchical kmeans is not supported with host data");
62+
}
63+
64+
auto centroids_dl = centroids_tensor->dl_tensor;
65+
if (!cuvs::core::is_dlpack_device_compatible(centroids_dl)) {
66+
RAFT_FAIL("centroids must be on device memory");
67+
}
68+
69+
auto X_view = raft::make_host_matrix_view<T const, IdxT>(
70+
reinterpret_cast<T const*>(X.data), n_samples, n_features);
71+
auto centroids_view =
72+
cuvs::core::from_dlpack<raft::device_matrix_view<T, IdxT, raft::row_major>>(
73+
centroids_tensor);
74+
75+
std::optional<raft::host_vector_view<T const, IdxT>> sample_weight;
76+
if (sample_weight_tensor != NULL) {
77+
auto sw = sample_weight_tensor->dl_tensor;
78+
if (!cuvs::core::is_dlpack_host_compatible(sw)) {
79+
RAFT_FAIL("sample_weight must be host accessible when X is on host");
80+
}
81+
sample_weight = raft::make_host_vector_view<T const, IdxT>(
82+
reinterpret_cast<T const*>(sw.data), n_samples);
83+
}
84+
85+
T inertia_temp;
86+
IdxT n_iter_temp;
87+
88+
auto kmeans_params = convert_params(params);
89+
cuvs::cluster::kmeans::fit(*res_ptr,
90+
kmeans_params,
91+
X_view,
92+
sample_weight,
93+
centroids_view,
94+
raft::make_host_scalar_view<T>(&inertia_temp),
95+
raft::make_host_scalar_view<IdxT>(&n_iter_temp));
96+
97+
*inertia = inertia_temp;
98+
*n_iter = n_iter_temp;
99+
100+
} else {
54101
using const_mdspan_type = raft::device_matrix_view<T const, IdxT, raft::row_major>;
55102
using mdspan_type = raft::device_matrix_view<T, IdxT, raft::row_major>;
56103

@@ -85,13 +132,11 @@ void _fit(cuvsResources_t res,
85132
cuvs::core::from_dlpack<const_mdspan_type>(X_tensor),
86133
sample_weight,
87134
cuvs::core::from_dlpack<mdspan_type>(centroids_tensor),
88-
raft::make_host_scalar_view<T, IdxT>(&inertia_temp),
89-
raft::make_host_scalar_view<IdxT, IdxT>(&n_iter_temp));
135+
raft::make_host_scalar_view<T>(&inertia_temp),
136+
raft::make_host_scalar_view<IdxT>(&n_iter_temp));
90137
*inertia = inertia_temp;
91138
*n_iter = n_iter_temp;
92139
}
93-
} else {
94-
RAFT_FAIL("X dataset must be accessible on device memory");
95140
}
96141
}
97142

@@ -143,7 +188,7 @@ void _predict(cuvsResources_t res,
143188
cuvs::core::from_dlpack<const_mdspan_type>(centroids_tensor),
144189
cuvs::core::from_dlpack<labels_mdspan_type>(labels_tensor),
145190
normalize_weight,
146-
raft::make_host_scalar_view<T, IdxT>(&inertia_temp));
191+
raft::make_host_scalar_view<T>(&inertia_temp));
147192
*inertia = inertia_temp;
148193
}
149194
} else {
@@ -168,7 +213,7 @@ void _cluster_cost(cuvsResources_t res,
168213
cuvs::cluster::kmeans::cluster_cost(*res_ptr,
169214
cuvs::core::from_dlpack<mdspan_type>(X_tensor),
170215
cuvs::core::from_dlpack<mdspan_type>(centroids_tensor),
171-
raft::make_host_scalar_view<T, IdxT>(&cost_temp));
216+
raft::make_host_scalar_view<T>(&cost_temp));
172217
} else {
173218
RAFT_FAIL("X dataset must be accessible on device memory");
174219
}
@@ -182,17 +227,20 @@ extern "C" cuvsError_t cuvsKMeansParamsCreate(cuvsKMeansParams_t* params)
182227
return cuvs::core::translate_exceptions([=] {
183228
cuvs::cluster::kmeans::params cpp_params;
184229
cuvs::cluster::kmeans::balanced_params cpp_balanced_params;
185-
*params =
186-
new cuvsKMeansParams{.metric = static_cast<cuvsDistanceType>(cpp_params.metric),
187-
.n_clusters = cpp_params.n_clusters,
188-
.init = static_cast<cuvsKMeansInitMethod>(cpp_params.init),
189-
.max_iter = cpp_params.max_iter,
190-
.tol = cpp_params.tol,
191-
.oversampling_factor = cpp_params.oversampling_factor,
192-
.batch_samples = cpp_params.batch_samples,
193-
.inertia_check = cpp_params.inertia_check,
194-
.hierarchical = false,
195-
.hierarchical_n_iters = static_cast<int>(cpp_balanced_params.n_iters)};
230+
*params = new cuvsKMeansParams{
231+
.metric = static_cast<cuvsDistanceType>(cpp_params.metric),
232+
.n_clusters = cpp_params.n_clusters,
233+
.init = static_cast<cuvsKMeansInitMethod>(cpp_params.init),
234+
.max_iter = cpp_params.max_iter,
235+
.tol = cpp_params.tol,
236+
.n_init = cpp_params.n_init,
237+
.oversampling_factor = cpp_params.oversampling_factor,
238+
.batch_samples = cpp_params.batch_samples,
239+
.batch_centroids = cpp_params.batch_centroids,
240+
.inertia_check = cpp_params.inertia_check,
241+
.hierarchical = false,
242+
.hierarchical_n_iters = static_cast<int>(cpp_balanced_params.n_iters),
243+
.streaming_batch_size = cpp_params.streaming_batch_size};
196244
});
197245
}
198246

@@ -235,10 +283,9 @@ extern "C" cuvsError_t cuvsKMeansPredict(cuvsResources_t res,
235283
return cuvs::core::translate_exceptions([=] {
236284
auto dataset = X->dl_tensor;
237285
if (dataset.dtype.code == kDLFloat && dataset.dtype.bits == 32) {
238-
_predict<float>(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia);
286+
_predict<float>(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia);
239287
} else if (dataset.dtype.code == kDLFloat && dataset.dtype.bits == 64) {
240-
_predict<double>(
241-
res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia);
288+
_predict<double>(res, *params, X, sample_weight, centroids, labels, normalize_weight, inertia);
242289
} else {
243290
RAFT_FAIL("Unsupported dataset DLtensor dtype: %d and bits: %d",
244291
dataset.dtype.code,

cpp/include/cuvs/cluster/kmeans.hpp

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,31 @@ struct params : base_params {
100100
* useful to optimize/control the memory footprint
101101
* Default tile is [batch_samples x n_clusters] i.e. when batch_centroids is 0
102102
* then don't tile the centroids
103+
*
104+
* NB: These parameters are unrelated to streaming_batch_size, which controls how many
105+
* samples to transfer from host to device per batch when processing out-of-core
106+
* data.
103107
*/
104108
int batch_samples = 1 << 15;
105109

106110
/**
107111
* if 0 then batch_centroids = n_clusters
108112
*/
109-
int batch_centroids = 0; //
113+
int batch_centroids = 0;
110114

115+
/**
116+
* If true, check inertia during iterations for early convergence.
117+
*/
111118
bool inertia_check = false;
119+
120+
/**
121+
* Number of samples to process per GPU batch when fitting with host data.
122+
* When set to 0, defaults to n_samples (process all at once).
123+
* Only used by the batched (host-data) code path and ignored by device-data
124+
* overloads.
125+
* Default: 0 (process all data at once).
126+
*/
127+
int64_t streaming_batch_size = 0;
112128
};
113129

114130
/**
@@ -141,6 +157,82 @@ enum class kmeans_type { KMeans = 0, KMeansBalanced = 1 };
141157
* @{
142158
*/
143159

160+
/**
161+
* @brief Find clusters with k-means algorithm using batched processing of host data.
162+
*
163+
* TODO: Evaluate replacing the extent type with int64_t. Reference issue:
164+
* https://github.com/rapidsai/cuvs/issues/1961
165+
*
166+
* This overload supports out-of-core computation where the dataset resides
167+
* on the host. Data is processed in GPU-sized batches, streaming from host to device.
168+
* The batch size is controlled by params.streaming_batch_size.
169+
*
170+
* @code{.cpp}
171+
* #include <raft/core/resources.hpp>
172+
* #include <cuvs/cluster/kmeans.hpp>
173+
* using namespace cuvs::cluster;
174+
* ...
175+
* raft::resources handle;
176+
* cuvs::cluster::kmeans::params params;
177+
* params.n_clusters = 100;
178+
* params.streaming_batch_size = 100000;
179+
* float inertia;
180+
* int64_t n_iter;
181+
*
182+
* // Data on host
183+
* std::vector<float> h_X(n_samples * n_features);
184+
* auto X = raft::make_host_matrix_view<const float, int64_t>(h_X.data(), n_samples, n_features);
185+
*
186+
* // Centroids on device
187+
* auto centroids = raft::make_device_matrix<float, int64_t>(handle, params.n_clusters,
188+
* n_features);
189+
*
190+
* kmeans::fit(handle,
191+
* params,
192+
* X,
193+
* std::nullopt,
194+
* centroids.view(),
195+
* raft::make_host_scalar_view(&inertia),
196+
* raft::make_host_scalar_view(&n_iter));
197+
* @endcode
198+
*
199+
* @param[in] handle The raft handle.
200+
* @param[in] params Parameters for KMeans model. Batch size is read from
201+
* params.streaming_batch_size.
202+
* @param[in] X Training instances on HOST memory. The data must
203+
* be in row-major format.
204+
* [dim = n_samples x n_features]
205+
* @param[in] sample_weight Optional weights for each observation in X (on host).
206+
* [len = n_samples]
207+
* @param[inout] centroids [in] When init is InitMethod::Array, use
208+
* centroids as the initial cluster centers.
209+
* [out] The generated centroids from the
210+
* kmeans algorithm are stored at the address
211+
* pointed by 'centroids'.
212+
* [dim = n_clusters x n_features]
213+
* @param[out] inertia Sum of squared distances of samples to their
214+
* closest cluster center.
215+
* @param[out] n_iter Number of iterations run.
216+
*/
217+
void fit(raft::resources const& handle,
218+
const cuvs::cluster::kmeans::params& params,
219+
raft::host_matrix_view<const float, int64_t> X,
220+
std::optional<raft::host_vector_view<const float, int64_t>> sample_weight,
221+
raft::device_matrix_view<float, int64_t> centroids,
222+
raft::host_scalar_view<float> inertia,
223+
raft::host_scalar_view<int64_t> n_iter);
224+
225+
/**
226+
* @brief Find clusters with k-means algorithm using batched processing of host data.
227+
*/
228+
void fit(raft::resources const& handle,
229+
const cuvs::cluster::kmeans::params& params,
230+
raft::host_matrix_view<const double, int64_t> X,
231+
std::optional<raft::host_vector_view<const double, int64_t>> sample_weight,
232+
raft::device_matrix_view<double, int64_t> centroids,
233+
raft::host_scalar_view<double> inertia,
234+
raft::host_scalar_view<int64_t> n_iter);
235+
144236
/**
145237
* @brief Find clusters with k-means algorithm.
146238
* Initial centroids are chosen with k-means++ algorithm. Empty

0 commit comments

Comments
 (0)