Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions python/cuml/cuml/dask/cluster/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,30 @@ def fit(self, X, sample_weight=None):

comms.destroy()

models = [res.result() for res in kmeans_fit]
first = models[0]
first.labels_ = cp.concatenate([model.labels_ for model in models])
first.inertia_ = sum(model.inertia_ for model in models)
# Collect the full model from only the first worker (for
# cluster_centers_ etc). Extract labels_ and inertia_ from the
# remaining workers remotely to avoid pulling N redundant copies
# of cluster_centers_ back to the client.
first = kmeans_fit[0].result()

remote_labels = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for putting up a PR fix so quickly!

May I know how large this remote_labels variable is if dataset has 1 billion rows? Will that blow up scheduler memory?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labels_ is a 1D array of length n_samples (in the dask case, split across N-workers). The dtype is typically int32, which brings you to 4 GiB for the array total.

In most deployments of dask the data doesn't go through the scheduler, it goes directly worker->client (also note that in cases where the scheduler runs on the same node as the client this distinction is meaningless). So you care more about the memory capacity client-side than on the scheduler itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying! In my deployment, the scheduler does run on the same node as the client, so they share the same GPU memory. That said, since it's a 1D int32 array, 4 GB seems manageable compared to the previous issue of collecting all workers' copies of the centroid matrix.

self.client.submit(getattr, f, "labels_", workers=[w])
for f, (w, _) in zip(
kmeans_fit[1:], list(data.worker_to_parts.items())[1:]
)
]
remote_inertias = [
self.client.submit(getattr, f, "inertia_", workers=[w])
for f, (w, _) in zip(
kmeans_fit[1:], list(data.worker_to_parts.items())[1:]
)
]

all_labels = [first.labels_] + self.client.gather(remote_labels)
all_inertias = [first.inertia_] + self.client.gather(remote_inertias)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an overhead to tasks - it'd be better to submit one task that returns a tuple of (labels, inertia) than 2 simple tasks.


first.labels_ = cp.concatenate(all_labels)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the target file and the specific lines mentioned
head -n 20 python/cuml/cuml/dask/cluster/kmeans.py | tail -n 10

Repository: rapidsai/cuml

Length of output: 404


🏁 Script executed:

# Check the full context around lines 181-184
sed -n '1,20p' python/cuml/cuml/dask/cluster/kmeans.py

Repository: rapidsai/cuml

Length of output: 692


🏁 Script executed:

# Check if the file exists and get line count
wc -l python/cuml/cuml/dask/cluster/kmeans.py

Repository: rapidsai/cuml

Length of output: 100


🏁 Script executed:

# Read the relevant section around lines 181-184
sed -n '170,190p' python/cuml/cuml/dask/cluster/kmeans.py

Repository: rapidsai/cuml

Length of output: 783


🏁 Script executed:

# Check imports at the top of the file
sed -n '1,30p' python/cuml/cuml/dask/cluster/kmeans.py

Repository: rapidsai/cuml

Length of output: 1013


🏁 Script executed:

# Now let's examine the helper function in input_utils.py
cat python/cuml/cuml/dask/common/input_utils.py | wc -l

Repository: rapidsai/cuml

Length of output: 60


🏁 Script executed:

# Check the helper function definition
sed -n '170,200p' python/cuml/cuml/dask/common/input_utils.py

Repository: rapidsai/cuml

Length of output: 887


🏁 Script executed:

# Get more context on the concatenate function
sed -n '175,195p' python/cuml/cuml/dask/common/input_utils.py

Repository: rapidsai/cuml

Length of output: 610


Use the imported concatenate() helper to preserve input type and avoid unnecessary copies.

Line 184 hard-codes cp.concatenate(), which causes type coercion when labels_ is a cuDF object and allocates a new buffer even in the single-worker case. The already-imported concatenate() from cuml.dask.common.input_utils preserves the input type (cuDF, CuPy, or NumPy) and skips the copy when there is only one shard.

Suggested fix
-        all_labels = [first.labels_] + self.client.gather(remote_labels)
-        all_inertias = [first.inertia_] + self.client.gather(remote_inertias)
+        all_labels = [first.labels_, *self.client.gather(remote_labels)]
+        all_inertias = [first.inertia_, *self.client.gather(remote_inertias)]

-        first.labels_ = cp.concatenate(all_labels)
+        first.labels_ = concatenate(all_labels, axis=0)
         first.inertia_ = sum(all_inertias)
🧰 Tools
🪛 Ruff (0.15.6)

[warning] 181-181: Consider iterable unpacking instead of concatenation

Replace with iterable unpacking

(RUF005)


[warning] 182-182: Consider iterable unpacking instead of concatenation

Replace with iterable unpacking

(RUF005)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/cuml/cuml/dask/cluster/kmeans.py` around lines 181 - 184, Replace the
hard-coded cp.concatenate call with the imported concatenate helper to preserve
input type and avoid unnecessary copies: where the code builds all_labels and
sets first.labels_ (currently using cp.concatenate(all_labels)), call
concatenate(all_labels) instead (the helper is imported from
cuml.dask.common.input_utils) so cuDF/CuPy/NumPy inputs keep their original
types and the single-shard path avoids an extra allocation.

first.inertia_ = sum(all_inertias)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Please add regression tests for the new fit aggregation path.

This is a behavior-critical bug fix; add coverage for both single-worker and multi-worker fit to assert correct labels_/inertia_ aggregation and prevent reintroducing redundant model materialization.

As per coding guidelines **/*.py: Update unit tests when making code changes.

🧰 Tools
🪛 Ruff (0.15.6)

[warning] 170-172: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)


[warning] 176-178: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)


[warning] 181-181: Consider iterable unpacking instead of concatenation

Replace with iterable unpacking

(RUF005)


[warning] 182-182: Consider iterable unpacking instead of concatenation

Replace with iterable unpacking

(RUF005)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/cuml/cuml/dask/cluster/kmeans.py` around lines 162 - 185, Add
regression tests that exercise the new fit aggregation path used in kmeans (the
block that builds `first = kmeans_fit[0].result()`, then remotely fetches
`labels_`/`inertia_` via `self.client.submit(getattr, f, "labels_", ...)` /
`getattr(..., "inertia_", ...)`, gathers them with `self.client.gather`, and
concatenates/sums into `first.labels_`/`first.inertia_`). Create two tests: one
that runs fit on a single-worker Dask cluster and verifies `labels_` and
`inertia_` match the non-distributed scikit-learn/cuml reference, and one that
runs on a multi-worker cluster and asserts the aggregated `labels_` (after
`cp.concatenate`) and aggregated `inertia_` (after sum) equal the expected
global values and that only the first worker materializes the full model (no
redundant cluster_centers_ copies). Ensure tests use the same api symbols
(`kmeans_fit`, `first`, `labels_`, `inertia_`, `client.gather`) so future
changes to aggregation logic are covered.

self._set_internal_model(first)

return self
Expand Down
Loading