Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/cuml/cuml/benchmark/automated/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ def setFixtureParamNames(*args, **kwargs):
transform,
)
from cuml.benchmark.nvtx_benchmark import Profiler
from cuml.dask._compat import DASK_2025_4_0


def distribute(client, data):
if data is not None:
n_rows = data.shape[0]
n_workers = len(client.scheduler_info()["workers"])
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
n_workers = len(client.scheduler_info(**kwargs)["workers"])
rows_per_chunk = math.ceil(n_rows / n_workers)
if isinstance(data, (np.ndarray, cp.ndarray)):
dask_array = da.from_array(
Expand Down
26 changes: 26 additions & 0 deletions python/cuml/cuml/dask/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2025 NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import functools

import dask
import packaging.version


@functools.lru_cache
def DASK_2025_4_0():
Comment thread
TomAugspurger marked this conversation as resolved.
return packaging.version.parse(
dask.__version__
) >= packaging.version.parse("2025.4.0")
4 changes: 3 additions & 1 deletion python/cuml/cuml/dask/datasets/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import cuml.internals.logger as logger
from cuml.common import with_cupy_rmm
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.utils import get_client
from cuml.dask.datasets.utils import _create_delayed, _get_labels, _get_X
from cuml.datasets.blobs import _get_centers
Expand Down Expand Up @@ -145,7 +146,8 @@ def make_blobs(
generator = _create_rs_generator(random_state=random_state)

if workers is None:
workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
Comment thread
TomAugspurger marked this conversation as resolved.
workers = list(client.scheduler_info(**kwargs)["workers"].keys())

n_parts = n_parts if n_parts is not None else len(workers)
parts_workers = (workers * n_parts)[:n_parts]
Expand Down
4 changes: 3 additions & 1 deletion python/cuml/cuml/dask/datasets/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np

from cuml.common import with_cupy_rmm
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.utils import get_client
from cuml.dask.datasets.utils import _create_delayed, _get_labels, _get_X
from cuml.datasets.classification import _generate_hypercube
Expand Down Expand Up @@ -194,7 +195,8 @@ def make_classification(

rs = _create_rs_generator(random_state)

workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(client.scheduler_info(**kwargs)["workers"].keys())

n_parts = n_parts if n_parts is not None else len(workers)
parts_workers = (workers * n_parts)[:n_parts]
Expand Down
17 changes: 13 additions & 4 deletions python/cuml/cuml/dask/ensemble/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dask.distributed import Future

from cuml import using_output_type
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.input_utils import DistributedDataHandler, concatenate
from cuml.dask.common.utils import get_client, wait_and_raise_from_futures
from cuml.legacy.fil.fil import TreeliteModel
Expand Down Expand Up @@ -52,7 +53,10 @@ def _create_model(
self.client = get_client(client)
if workers is None:
# Default to all workers
workers = list(self.client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(
self.client.scheduler_info(**kwargs)["workers"].keys()
)
self.workers = workers
self._set_internal_model(None)
self.active_workers = list()
Expand Down Expand Up @@ -213,10 +217,15 @@ def _partial_inference(self, X, op_type, delayed, **kwargs):
pure=False,
)
)
partial_infs = dask.delayed(dask.array.concatenate)(
partial_infs, axis=1, allow_unknown_chunksizes=True
shape = (X.shape[0], 1, self.num_classes)
objs = [
dask.array.from_delayed(partial_inf, shape=shape, dtype=np.float32)
for partial_inf in partial_infs
]
result = dask.array.concatenate(
objs, axis=1, allow_unknown_chunksizes=True
)
return partial_infs
return result

def _predict_using_fil(self, X, delayed, **kwargs):
if self._get_internal_model() is None:
Expand Down
9 changes: 6 additions & 3 deletions python/cuml/cuml/dask/neighbors/kneighbors_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,12 @@ def score(self, X, y, convert_dtype=True):
-------
score
"""
y_pred = self.predict(X, convert_dtype=convert_dtype)
if not isinstance(y_pred, da.Array):
y_pred = y_pred.to_dask_array(lengths=True)
y_pred_plain = self.predict(X, convert_dtype=convert_dtype)
Comment thread
TomAugspurger marked this conversation as resolved.
if not isinstance(y_pred_plain, da.Array):
y_pred = y_pred_plain.to_dask_array(lengths=True)
else:
y_pred = y_pred_plain

if not isinstance(y, da.Array):
y = y.to_dask_array(lengths=True)
y_true = y.squeeze()
Expand Down
62 changes: 16 additions & 46 deletions python/cuml/cuml/dask/preprocessing/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#

import cupy as cp
import cupyx
import dask

from cuml.common import rmm_cupy_ary
from cuml.dask.common.base import BaseEstimator
Expand Down Expand Up @@ -106,12 +104,12 @@ def _func_unique_classes(y):
return rmm_cupy_ary(cp.unique, y)

@staticmethod
def _func_xform(model, y):
def _func_xform(y, *, model):
Comment thread
TomAugspurger marked this conversation as resolved.
xform_in = rmm_cupy_ary(cp.asarray, y, dtype=y.dtype)
return model.transform(xform_in)

@staticmethod
def _func_inv_xform(model, y, threshold):
def _func_inv_xform(y, *, model, threshold):
y = rmm_cupy_ary(cp.asarray, y, dtype=y.dtype)
return model.inverse_transform(y, threshold)

Expand Down Expand Up @@ -177,27 +175,13 @@ def transform(self, y):

arr : Dask.Array backed by CuPy arrays containing encoded labels
"""

parts = self.client.sync(_extract_partitions, y)

internal_model = self._get_internal_model()

xform_func = dask.delayed(LabelBinarizer._func_xform)
meta = rmm_cupy_ary(cp.zeros, 1)
if internal_model.sparse_output:
meta = cupyx.scipy.sparse.csr_matrix(meta)
f = [
dask.array.from_delayed(
xform_func(internal_model, part),
meta=meta,
dtype=cp.float32,
shape=(cp.nan, len(self.classes_)),
)
for w, part in parts
]

arr = dask.array.concatenate(f, axis=0, allow_unknown_chunksizes=True)
return arr
new_axis = 1 if y.ndim == 1 else None
return y.map_blocks(
LabelBinarizer._func_xform,
model=self._get_internal_model(),
dtype=cp.float32,
new_axis=new_axis,
)

def inverse_transform(self, y, threshold=None):
"""
Expand All @@ -216,24 +200,10 @@ def inverse_transform(self, y, threshold=None):

arr : Dask.Array backed by CuPy arrays containing original labels
"""

parts = self.client.sync(_extract_partitions, y)
inv_func = dask.delayed(LabelBinarizer._func_inv_xform)

dtype = self.classes_.dtype
meta = rmm_cupy_ary(cp.zeros, 1, dtype=dtype)

internal_model = self._get_internal_model()

f = [
dask.array.from_delayed(
inv_func(internal_model, part, threshold),
dtype=dtype,
shape=(cp.nan,),
meta=meta,
)
for w, part in parts
]

arr = dask.array.concatenate(f, axis=0, allow_unknown_chunksizes=True)
return arr
return y.map_blocks(
LabelBinarizer._func_inv_xform,
model=self._get_internal_model(),
dtype=y.dtype,
threshold=threshold,
drop_axis=1,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import scipy.stats
from sklearn.neighbors import KNeighborsClassifier

from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common import utils as dask_utils
from cuml.testing.utils import (
array_equal,
Expand Down Expand Up @@ -70,7 +71,8 @@ def _prep_training_data(


def _scale_rows(client, nrows):
workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(client.scheduler_info(**kwargs)["workers"].keys())
n_workers = len(workers)
return n_workers * nrows

Expand Down
26 changes: 14 additions & 12 deletions python/cuml/cuml/tests/dask/test_dask_random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestClassifier as cuRFC_mg
from cuml.dask.ensemble import RandomForestRegressor as cuRFR_mg
Expand Down Expand Up @@ -71,7 +72,8 @@ def test_rf_classification_multi_class(partitions_per_worker, cluster):

# Use CUDA_VISIBLE_DEVICES to control the number of workers
c = Client(cluster)
n_workers = len(c.scheduler_info()["workers"])
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
n_workers = len(c.scheduler_info(**kwargs)["workers"])

try:

Expand Down Expand Up @@ -125,7 +127,7 @@ def test_rf_classification_multi_class(partitions_per_worker, cluster):
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_regression_dask_fil(partitions_per_worker, dtype, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

# Use CUDA_VISIBLE_DEVICES to control the number of workers
X, y = make_regression(
Expand Down Expand Up @@ -175,7 +177,7 @@ def test_rf_regression_dask_fil(partitions_per_worker, dtype, client):

@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_classification_dask_array(partitions_per_worker, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=n_workers * 2000,
Expand Down Expand Up @@ -214,7 +216,7 @@ def test_rf_classification_dask_array(partitions_per_worker, client):

@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_regression_dask_cpu(partitions_per_worker, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_regression(
n_samples=n_workers * 2000,
Expand Down Expand Up @@ -263,7 +265,7 @@ def test_rf_regression_dask_cpu(partitions_per_worker, client):
def test_rf_classification_dask_fil_predict_proba(
partitions_per_worker, client
):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=n_workers * 1500,
Expand Down Expand Up @@ -319,7 +321,7 @@ def test_rf_classification_dask_fil_predict_proba(

@pytest.mark.parametrize("model_type", ["classification", "regression"])
def test_rf_concatenation_dask(client, model_type):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

from cuml.legacy.fil.fil import TreeliteModel

Expand Down Expand Up @@ -367,7 +369,7 @@ def test_single_input_regression(client, ignore_empty_partitions):

if (
ignore_empty_partitions
or len(client.scheduler_info()["workers"].keys()) == 1
or len(client.scheduler_info(n_workers=-1)["workers"].keys()) == 1
):
cu_rf_mg.fit(X, y)
cuml_mod_predict = cu_rf_mg.predict(X)
Expand All @@ -384,7 +386,7 @@ def test_single_input_regression(client, ignore_empty_partitions):
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
@pytest.mark.parametrize("estimator_type", ["regression", "classification"])
def test_rf_get_json(client, estimator_type, max_depth, n_estimators):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -486,7 +488,7 @@ def predict_with_json_rf_regressor(rf, x):
@pytest.mark.parametrize("max_depth", [1, 2, 3, 5, 10, 15, 20])
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
def test_rf_instance_count(client, max_depth, n_estimators):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -548,7 +550,7 @@ def test_rf_get_combined_model_right_aftter_fit(client, estimator_type):
max_depth = 3
n_estimators = 5

n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -593,7 +595,7 @@ def test_rf_get_combined_model_right_aftter_fit(client, estimator_type):
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
@pytest.mark.parametrize("detailed_text", [True, False])
def test_rf_get_text(client, n_estimators, detailed_text):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=500,
Expand Down Expand Up @@ -646,7 +648,7 @@ def test_rf_get_text(client, n_estimators, detailed_text):
@pytest.mark.parametrize("transform_broadcast", [True, False])
def test_rf_broadcast(model_type, fit_broadcast, transform_broadcast, client):
# Use CUDA_VISIBLE_DEVICES to control the number of workers
workers = list(client.scheduler_info()["workers"].keys())
workers = list(client.scheduler_info(n_workers=-1)["workers"].keys())
n_workers = len(workers)

if model_type == "classification":
Expand Down
Loading