Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 6 additions & 56 deletions python/cuml/cuml/dask/common/dask_arr_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,21 +77,7 @@ def _conv_array_to_sparse(arr):
def to_sparse_dask_array(cudf_or_array, client=None):
"""
Converts an array or cuDF to a sparse Dask array backed by sparse CuPy.
CSR matrices. Unfortunately, due to current limitations in Dask, there is
no direct path to convert a cupyx.scipy.sparse.spmatrix into a CuPy backed
dask.Array without copying to host.


NOTE: Until https://github.com/cupy/cupy/issues/2655 and
https://github.com/dask/dask/issues/5604 are implemented, compute()
will not be able to be called on a Dask.array that is backed with
sparse CuPy arrays because they lack the necessary functionality
to be stacked into a single array. The array returned from this
utility will, however, still be able to be passed into functions
that can make use of sparse CuPy-backed Dask.Array (eg. Distributed
Naive Bayes).

Relevant cuML issue: https://github.com/rapidsai/cuml/issues/1387
Csr matrices.

Parameters
----------
Expand All @@ -105,51 +91,15 @@ def to_sparse_dask_array(cudf_or_array, client=None):
-------
dask_array : dask.Array backed by cupyx.scipy.sparse.csr_matrix
"""
client = default_client() if client is None else client

# Makes sure the MatDescriptor workaround for CuPy sparse arrays
# is loaded (since Dask lazy-loaded serialization in cuML is only
# executed when object from the cuML package needs serialization.
# This can go away once the MatDescriptor pickling bug is fixed
# in CuPy.
# Ref: https://github.com/cupy/cupy/issues/3061
from cuml.comm import serialize # NOQA

ret = cudf_or_array
shape = cudf_or_array.shape

meta = cupyx.scipy.sparse.csr_matrix(rmm_cupy_ary(cp.zeros, 1))

ret = cudf_or_array

# If we have a Dask array, convert it to a Dask DataFrame
if isinstance(ret, dask.array.Array):
# At the time of developing this, using map_blocks will not work
# to convert a Dask.Array to CuPy sparse arrays underneath.

def _conv_np_to_df(x):
cupy_ary = rmm_cupy_ary(cp.asarray, x, dtype=x.dtype)
return cudf.DataFrame(cupy_ary)

parts = client.sync(_extract_partitions, ret)
futures = [
client.submit(_conv_np_to_df, part, workers=[w], pure=False)
for w, part in parts
]

ret = df_to_dask_cudf(futures)

# If we have a Dask Dataframe, use `map_partitions` to convert it
# to a Sparse Cupy-backed Dask Array. This will also convert the dense
# Dask array above to a Sparse Cupy-backed Dask Array, since we cannot
# use map_blocks on the array, but we can use `map_partitions` on the
# Dataframe.
if isinstance(ret, dask.dataframe.DataFrame):
ret = ret.map_partitions(
_conv_df_to_sparse, meta=dask.array.from_array(meta)
)
ret = ret.to_dask_array()

# This will also handle the input of dask.array.Array
return ret
if isinstance(cudf_or_array, dask.array.Array):
return cudf_or_array.map_blocks(_conv_array_to_sparse, meta=meta)

else:

Expand Down
Loading