Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/cuml/cuml/benchmark/automated/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ def setFixtureParamNames(*args, **kwargs):
transform,
)
from cuml.benchmark.nvtx_benchmark import Profiler
from cuml.dask._compat import DASK_2025_4_0


def distribute(client, data):
if data is not None:
n_rows = data.shape[0]
n_workers = len(client.scheduler_info()["workers"])
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
n_workers = len(client.scheduler_info(**kwargs)["workers"])
rows_per_chunk = math.ceil(n_rows / n_workers)
if isinstance(data, (np.ndarray, cp.ndarray)):
dask_array = da.from_array(
Expand Down
26 changes: 26 additions & 0 deletions python/cuml/cuml/dask/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2025 NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import functools

import dask
import packaging.version


@functools.lru_cache
def DASK_2025_4_0():
Comment thread
TomAugspurger marked this conversation as resolved.
return packaging.version.parse(
dask.__version__
) >= packaging.version.parse("2025.4.0")
4 changes: 3 additions & 1 deletion python/cuml/cuml/dask/datasets/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import cuml.internals.logger as logger
from cuml.common import with_cupy_rmm
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.utils import get_client
from cuml.dask.datasets.utils import _create_delayed, _get_labels, _get_X
from cuml.datasets.blobs import _get_centers
Expand Down Expand Up @@ -145,7 +146,8 @@ def make_blobs(
generator = _create_rs_generator(random_state=random_state)

if workers is None:
workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
Comment thread
TomAugspurger marked this conversation as resolved.
workers = list(client.scheduler_info(**kwargs)["workers"].keys())

n_parts = n_parts if n_parts is not None else len(workers)
parts_workers = (workers * n_parts)[:n_parts]
Expand Down
4 changes: 3 additions & 1 deletion python/cuml/cuml/dask/datasets/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np

from cuml.common import with_cupy_rmm
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.utils import get_client
from cuml.dask.datasets.utils import _create_delayed, _get_labels, _get_X
from cuml.datasets.classification import _generate_hypercube
Expand Down Expand Up @@ -194,7 +195,8 @@ def make_classification(

rs = _create_rs_generator(random_state)

workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(client.scheduler_info(**kwargs)["workers"].keys())

n_parts = n_parts if n_parts is not None else len(workers)
parts_workers = (workers * n_parts)[:n_parts]
Expand Down
17 changes: 13 additions & 4 deletions python/cuml/cuml/dask/ensemble/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dask.distributed import Future

from cuml import using_output_type
from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common.input_utils import DistributedDataHandler, concatenate
from cuml.dask.common.utils import get_client, wait_and_raise_from_futures
from cuml.legacy.fil.fil import TreeliteModel
Expand Down Expand Up @@ -52,7 +53,10 @@ def _create_model(
self.client = get_client(client)
if workers is None:
# Default to all workers
workers = list(self.client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(
self.client.scheduler_info(**kwargs)["workers"].keys()
)
self.workers = workers
self._set_internal_model(None)
self.active_workers = list()
Expand Down Expand Up @@ -213,10 +217,15 @@ def _partial_inference(self, X, op_type, delayed, **kwargs):
pure=False,
)
)
partial_infs = dask.delayed(dask.array.concatenate)(
partial_infs, axis=1, allow_unknown_chunksizes=True
shape = (X.shape[0], 1, self.num_classes)
objs = [
dask.array.from_delayed(partial_inf, shape=shape, dtype=np.float32)
for partial_inf in partial_infs
]
result = dask.array.concatenate(
objs, axis=1, allow_unknown_chunksizes=True
)
return partial_infs
return result

def _predict_using_fil(self, X, delayed, **kwargs):
if self._get_internal_model() is None:
Expand Down
9 changes: 6 additions & 3 deletions python/cuml/cuml/dask/neighbors/kneighbors_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,12 @@ def score(self, X, y, convert_dtype=True):
-------
score
"""
y_pred = self.predict(X, convert_dtype=convert_dtype)
if not isinstance(y_pred, da.Array):
y_pred = y_pred.to_dask_array(lengths=True)
y_pred_plain = self.predict(X, convert_dtype=convert_dtype)
Comment thread
TomAugspurger marked this conversation as resolved.
if not isinstance(y_pred_plain, da.Array):
y_pred = y_pred_plain.to_dask_array(lengths=True)
else:
y_pred = y_pred_plain

if not isinstance(y, da.Array):
y = y.to_dask_array(lengths=True)
y_true = y.squeeze()
Expand Down
62 changes: 16 additions & 46 deletions python/cuml/cuml/dask/preprocessing/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#

import cupy as cp
import cupyx
import dask

from cuml.common import rmm_cupy_ary
from cuml.dask.common.base import BaseEstimator
Expand Down Expand Up @@ -106,12 +104,12 @@ def _func_unique_classes(y):
return rmm_cupy_ary(cp.unique, y)

@staticmethod
def _func_xform(model, y):
def _func_xform(y, *, model):
Comment thread
TomAugspurger marked this conversation as resolved.
xform_in = rmm_cupy_ary(cp.asarray, y, dtype=y.dtype)
return model.transform(xform_in)

@staticmethod
def _func_inv_xform(model, y, threshold):
def _func_inv_xform(y, *, model, threshold):
y = rmm_cupy_ary(cp.asarray, y, dtype=y.dtype)
return model.inverse_transform(y, threshold)

Expand Down Expand Up @@ -177,27 +175,13 @@ def transform(self, y):

arr : Dask.Array backed by CuPy arrays containing encoded labels
"""

parts = self.client.sync(_extract_partitions, y)

internal_model = self._get_internal_model()

xform_func = dask.delayed(LabelBinarizer._func_xform)
meta = rmm_cupy_ary(cp.zeros, 1)
if internal_model.sparse_output:
meta = cupyx.scipy.sparse.csr_matrix(meta)
f = [
dask.array.from_delayed(
xform_func(internal_model, part),
meta=meta,
dtype=cp.float32,
shape=(cp.nan, len(self.classes_)),
)
for w, part in parts
]

arr = dask.array.concatenate(f, axis=0, allow_unknown_chunksizes=True)
return arr
new_axis = 1 if y.ndim == 1 else None
return y.map_blocks(
LabelBinarizer._func_xform,
model=self._get_internal_model(),
dtype=cp.float32,
new_axis=new_axis,
)

def inverse_transform(self, y, threshold=None):
"""
Expand All @@ -216,24 +200,10 @@ def inverse_transform(self, y, threshold=None):

arr : Dask.Array backed by CuPy arrays containing original labels
"""

parts = self.client.sync(_extract_partitions, y)
inv_func = dask.delayed(LabelBinarizer._func_inv_xform)

dtype = self.classes_.dtype
meta = rmm_cupy_ary(cp.zeros, 1, dtype=dtype)

internal_model = self._get_internal_model()

f = [
dask.array.from_delayed(
inv_func(internal_model, part, threshold),
dtype=dtype,
shape=(cp.nan,),
meta=meta,
)
for w, part in parts
]

arr = dask.array.concatenate(f, axis=0, allow_unknown_chunksizes=True)
return arr
return y.map_blocks(
LabelBinarizer._func_inv_xform,
model=self._get_internal_model(),
dtype=y.dtype,
threshold=threshold,
drop_axis=1,
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import scipy.stats
from sklearn.neighbors import KNeighborsClassifier

from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common import utils as dask_utils
from cuml.testing.utils import (
array_equal,
Expand Down Expand Up @@ -70,7 +71,8 @@ def _prep_training_data(


def _scale_rows(client, nrows):
workers = list(client.scheduler_info()["workers"].keys())
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
workers = list(client.scheduler_info(**kwargs)["workers"].keys())
n_workers = len(workers)
return n_workers * nrows

Expand Down
26 changes: 14 additions & 12 deletions python/cuml/cuml/tests/dask/test_dask_random_forest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

from cuml.dask._compat import DASK_2025_4_0
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestClassifier as cuRFC_mg
from cuml.dask.ensemble import RandomForestRegressor as cuRFR_mg
Expand Down Expand Up @@ -71,7 +72,8 @@ def test_rf_classification_multi_class(partitions_per_worker, cluster):

# Use CUDA_VISIBLE_DEVICES to control the number of workers
c = Client(cluster)
n_workers = len(c.scheduler_info()["workers"])
kwargs = {"n_workers": -1} if DASK_2025_4_0() else {}
n_workers = len(c.scheduler_info(**kwargs)["workers"])

try:

Expand Down Expand Up @@ -125,7 +127,7 @@ def test_rf_classification_multi_class(partitions_per_worker, cluster):
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_regression_dask_fil(partitions_per_worker, dtype, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

# Use CUDA_VISIBLE_DEVICES to control the number of workers
X, y = make_regression(
Expand Down Expand Up @@ -175,7 +177,7 @@ def test_rf_regression_dask_fil(partitions_per_worker, dtype, client):

@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_classification_dask_array(partitions_per_worker, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=n_workers * 2000,
Expand Down Expand Up @@ -214,7 +216,7 @@ def test_rf_classification_dask_array(partitions_per_worker, client):

@pytest.mark.parametrize("partitions_per_worker", [5])
def test_rf_regression_dask_cpu(partitions_per_worker, client):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_regression(
n_samples=n_workers * 2000,
Expand Down Expand Up @@ -263,7 +265,7 @@ def test_rf_regression_dask_cpu(partitions_per_worker, client):
def test_rf_classification_dask_fil_predict_proba(
partitions_per_worker, client
):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=n_workers * 1500,
Expand Down Expand Up @@ -319,7 +321,7 @@ def test_rf_classification_dask_fil_predict_proba(

@pytest.mark.parametrize("model_type", ["classification", "regression"])
def test_rf_concatenation_dask(client, model_type):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

from cuml.legacy.fil.fil import TreeliteModel

Expand Down Expand Up @@ -367,7 +369,7 @@ def test_single_input_regression(client, ignore_empty_partitions):

if (
ignore_empty_partitions
or len(client.scheduler_info()["workers"].keys()) == 1
or len(client.scheduler_info(n_workers=-1)["workers"].keys()) == 1
):
cu_rf_mg.fit(X, y)
cuml_mod_predict = cu_rf_mg.predict(X)
Expand All @@ -384,7 +386,7 @@ def test_single_input_regression(client, ignore_empty_partitions):
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
@pytest.mark.parametrize("estimator_type", ["regression", "classification"])
def test_rf_get_json(client, estimator_type, max_depth, n_estimators):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -486,7 +488,7 @@ def predict_with_json_rf_regressor(rf, x):
@pytest.mark.parametrize("max_depth", [1, 2, 3, 5, 10, 15, 20])
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
def test_rf_instance_count(client, max_depth, n_estimators):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -548,7 +550,7 @@ def test_rf_get_combined_model_right_aftter_fit(client, estimator_type):
max_depth = 3
n_estimators = 5

n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])
if n_estimators < n_workers:
err_msg = "n_estimators cannot be lower than number of dask workers"
pytest.xfail(err_msg)
Expand Down Expand Up @@ -593,7 +595,7 @@ def test_rf_get_combined_model_right_aftter_fit(client, estimator_type):
@pytest.mark.parametrize("n_estimators", [5, 10, 20])
@pytest.mark.parametrize("detailed_text", [True, False])
def test_rf_get_text(client, n_estimators, detailed_text):
n_workers = len(client.scheduler_info()["workers"])
n_workers = len(client.scheduler_info(n_workers=-1)["workers"])

X, y = make_classification(
n_samples=500,
Expand Down Expand Up @@ -646,7 +648,7 @@ def test_rf_get_text(client, n_estimators, detailed_text):
@pytest.mark.parametrize("transform_broadcast", [True, False])
def test_rf_broadcast(model_type, fit_broadcast, transform_broadcast, client):
# Use CUDA_VISIBLE_DEVICES to control the number of workers
workers = list(client.scheduler_info()["workers"].keys())
workers = list(client.scheduler_info(n_workers=-1)["workers"].keys())
n_workers = len(workers)

if model_type == "classification":
Expand Down
Loading