diff --git a/python/cuml/cuml/dask/common/dask_arr_utils.py b/python/cuml/cuml/dask/common/dask_arr_utils.py index 4e1d72254c..7ca03d56b4 100644 --- a/python/cuml/cuml/dask/common/dask_arr_utils.py +++ b/python/cuml/cuml/dask/common/dask_arr_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,9 +15,7 @@ from cuml.common import rmm_cupy_ary, has_scipy -from cuml.dask.common.part_utils import _extract_partitions from dask.distributed import default_client -from cuml.dask.common.dask_df_utils import to_dask_cudf as df_to_dask_cudf from cuml.internals.memory_utils import with_cupy_rmm import dask.dataframe as dd import dask @@ -77,21 +75,7 @@ def _conv_array_to_sparse(arr): def to_sparse_dask_array(cudf_or_array, client=None): """ Converts an array or cuDF to a sparse Dask array backed by sparse CuPy. - CSR matrices. Unfortunately, due to current limitations in Dask, there is - no direct path to convert a cupyx.scipy.sparse.spmatrix into a CuPy backed - dask.Array without copying to host. - - - NOTE: Until https://github.com/cupy/cupy/issues/2655 and - https://github.com/dask/dask/issues/5604 are implemented, compute() - will not be able to be called on a Dask.array that is backed with - sparse CuPy arrays because they lack the necessary functionality - to be stacked into a single array. The array returned from this - utility will, however, still be able to be passed into functions - that can make use of sparse CuPy-backed Dask.Array (eg. Distributed - Naive Bayes). - - Relevant cuML issue: https://github.com/rapidsai/cuml/issues/1387 + Csr matrices. Parameters ---------- @@ -105,51 +89,15 @@ def to_sparse_dask_array(cudf_or_array, client=None): ------- dask_array : dask.Array backed by cupyx.scipy.sparse.csr_matrix """ - client = default_client() if client is None else client - - # Makes sure the MatDescriptor workaround for CuPy sparse arrays - # is loaded (since Dask lazy-loaded serialization in cuML is only - # executed when object from the cuML package needs serialization. - # This can go away once the MatDescriptor pickling bug is fixed - # in CuPy. - # Ref: https://github.com/cupy/cupy/issues/3061 - from cuml.comm import serialize # NOQA - + ret = cudf_or_array shape = cudf_or_array.shape - meta = cupyx.scipy.sparse.csr_matrix(rmm_cupy_ary(cp.zeros, 1)) - ret = cudf_or_array - - # If we have a Dask array, convert it to a Dask DataFrame - if isinstance(ret, dask.array.Array): - # At the time of developing this, using map_blocks will not work - # to convert a Dask.Array to CuPy sparse arrays underneath. - - def _conv_np_to_df(x): - cupy_ary = rmm_cupy_ary(cp.asarray, x, dtype=x.dtype) - return cudf.DataFrame(cupy_ary) - - parts = client.sync(_extract_partitions, ret) - futures = [ - client.submit(_conv_np_to_df, part, workers=[w], pure=False) - for w, part in parts - ] - - ret = df_to_dask_cudf(futures) - - # If we have a Dask Dataframe, use `map_partitions` to convert it - # to a Sparse Cupy-backed Dask Array. This will also convert the dense - # Dask array above to a Sparse Cupy-backed Dask Array, since we cannot - # use map_blocks on the array, but we can use `map_partitions` on the - # Dataframe. if isinstance(ret, dask.dataframe.DataFrame): - ret = ret.map_partitions( - _conv_df_to_sparse, meta=dask.array.from_array(meta) - ) + ret = ret.to_dask_array() - # This will also handle the input of dask.array.Array - return ret + if isinstance(cudf_or_array, dask.array.Array): + return cudf_or_array.map_blocks(_conv_array_to_sparse, meta=meta) else: