Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ci/build_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ mkdir -p "${RAPIDS_DOCS_DIR}/libcudf/html"
mv html/* "${RAPIDS_DOCS_DIR}/libcudf/html"
popd

# TODO: Remove this once dask-expr works in the 10min notebook
export DASK_DATAFRAME__QUERY_PLANNING=False

rapids-logger "Build Python docs"
pushd docs/cudf
make dirhtml
Expand Down
106 changes: 93 additions & 13 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
import functools

from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._reductions import Var
from dask_expr._expr import Expr, VarColumns
from dask_expr._reductions import Reduction, Var

from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty

##
## Custom expression patching
Expand All @@ -25,19 +29,95 @@ def _kwargs(self) -> dict:
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs


# This patch accounts for differences between
# numpy and cupy behavior. It may make sense
# to move this logic upstream.
_dx_reduction_aggregate = Var.reduction_aggregate
# The upstream Var code uses `Series.values`, and relies on numpy
# for most of the logic. Unfortunately, cudf -> cupy conversion
# is not supported for data containing null values. Therefore,
# we must implement our own version of Var for now. This logic
# is mostly copied from dask-cudf.


class VarCudf(Reduction):
# Uses the parallel version of Welford's online algorithm (Chan '79)
# (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf)
_parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"]
_defaults = {
"skipna": True,
"ddof": 1,
"numeric_only": False,
"split_every": False,
}

@functools.cached_property
def _meta(self):
return make_meta(
meta_nonempty(self.frame._meta).var(
skipna=self.skipna, numeric_only=self.numeric_only
)
)

@property
def chunk_kwargs(self):
return dict(skipna=self.skipna, numeric_only=self.numeric_only)

@property
def combine_kwargs(self):
return {}

@property
def aggregate_kwargs(self):
return dict(ddof=self.ddof)

@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {}
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x[n.index]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

@classmethod
def reduction_combine(cls, parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta**2 * n_a * n_b / n
return n, avg, m2

@classmethod
def reduction_aggregate(cls, vals, ddof=1):
vals = cls.reduction_combine(vals)
n, _, m2 = vals
return m2 / (n - ddof)


def _reduction_aggregate(*args, **kwargs):
result = _dx_reduction_aggregate(*args, **kwargs)
if result.ndim == 0:
# cupy will sometimes produce a 0d array, and
# we need to convert it to a scalar.
return result.item()
return result
def _patched_var(
self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False
):
if axis == 0:
if hasattr(self._meta, "to_pandas"):
return VarCudf(self, skipna, ddof, numeric_only, split_every)
else:
return Var(self, skipna, ddof, numeric_only, split_every)
elif axis == 1:
return VarColumns(self, skipna, ddof, numeric_only)
else:
raise ValueError(f"axis={axis} not supported. Please specify 0 or 1")


Var.reduction_aggregate = staticmethod(_reduction_aggregate)
Expr.var = _patched_var
10 changes: 10 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ def test_rowwise_reductions(data, op):
check_exact=False,
check_dtype=op not in ("var", "std"),
)


@pytest.mark.parametrize("skipna", [True, False])
def test_var_nulls(skipna):
# Copied from 10min example notebook
# See: https://github.com/rapidsai/cudf/pull/15347
s = cudf.Series([1, 2, 3, None, 4])
ds = dask_cudf.from_cudf(s, npartitions=2)
dd.assert_eq(s.var(skipna=skipna), ds.var(skipna=skipna))
dd.assert_eq(s.std(skipna=skipna), ds.std(skipna=skipna))