diff --git a/ci/build_docs.sh b/ci/build_docs.sh index db0109015b8..668d52e530b 100755 --- a/ci/build_docs.sh +++ b/ci/build_docs.sh @@ -41,9 +41,6 @@ mkdir -p "${RAPIDS_DOCS_DIR}/libcudf/html" mv html/* "${RAPIDS_DOCS_DIR}/libcudf/html" popd -# TODO: Remove this once dask-expr works in the 10min notebook -export DASK_DATAFRAME__QUERY_PLANNING=False - rapids-logger "Build Python docs" pushd docs/cudf make dirhtml diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 6def6e23b12..ff037b9520c 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,7 +1,11 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import functools from dask_expr._cumulative import CumulativeBlockwise -from dask_expr._reductions import Var +from dask_expr._expr import Expr, VarColumns +from dask_expr._reductions import Reduction, Var + +from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty ## ## Custom expression patching @@ -25,19 +29,95 @@ def _kwargs(self) -> dict: CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs -# This patch accounts for differences between -# numpy and cupy behavior. It may make sense -# to move this logic upstream. -_dx_reduction_aggregate = Var.reduction_aggregate +# The upstream Var code uses `Series.values`, and relies on numpy +# for most of the logic. Unfortunately, cudf -> cupy conversion +# is not supported for data containing null values. Therefore, +# we must implement our own version of Var for now. This logic +# is mostly copied from dask-cudf. + + +class VarCudf(Reduction): + # Uses the parallel version of Welford's online algorithm (Chan '79) + # (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf) + _parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"] + _defaults = { + "skipna": True, + "ddof": 1, + "numeric_only": False, + "split_every": False, + } + + @functools.cached_property + def _meta(self): + return make_meta( + meta_nonempty(self.frame._meta).var( + skipna=self.skipna, numeric_only=self.numeric_only + ) + ) + + @property + def chunk_kwargs(self): + return dict(skipna=self.skipna, numeric_only=self.numeric_only) + + @property + def combine_kwargs(self): + return {} + + @property + def aggregate_kwargs(self): + return dict(ddof=self.ddof) + + @classmethod + def reduction_chunk(cls, x, skipna=True, numeric_only=False): + kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {} + if skipna or numeric_only: + n = x.count(**kwargs) + kwargs["skipna"] = skipna + avg = x.mean(**kwargs) + else: + # Not skipping nulls, so might as well + # avoid the full `count` operation + n = len(x) + kwargs["skipna"] = skipna + avg = x.sum(**kwargs) / n + if numeric_only: + # Workaround for cudf bug + # (see: https://github.com/rapidsai/cudf/issues/13731) + x = x[n.index] + m2 = ((x - avg) ** 2).sum(**kwargs) + return n, avg, m2 + + @classmethod + def reduction_combine(cls, parts): + n, avg, m2 = parts[0] + for i in range(1, len(parts)): + n_a, avg_a, m2_a = n, avg, m2 + n_b, avg_b, m2_b = parts[i] + n = n_a + n_b + avg = (n_a * avg_a + n_b * avg_b) / n + delta = avg_b - avg_a + m2 = m2_a + m2_b + delta**2 * n_a * n_b / n + return n, avg, m2 + + @classmethod + def reduction_aggregate(cls, vals, ddof=1): + vals = cls.reduction_combine(vals) + n, _, m2 = vals + return m2 / (n - ddof) -def _reduction_aggregate(*args, **kwargs): - result = _dx_reduction_aggregate(*args, **kwargs) - if result.ndim == 0: - # cupy will sometimes produce a 0d array, and - # we need to convert it to a scalar. - return result.item() - return result +def _patched_var( + self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False +): + if axis == 0: + if hasattr(self._meta, "to_pandas"): + return VarCudf(self, skipna, ddof, numeric_only, split_every) + else: + return Var(self, skipna, ddof, numeric_only, split_every) + elif axis == 1: + return VarColumns(self, skipna, ddof, numeric_only) + else: + raise ValueError(f"axis={axis} not supported. Please specify 0 or 1") -Var.reduction_aggregate = staticmethod(_reduction_aggregate) +Expr.var = _patched_var diff --git a/python/dask_cudf/dask_cudf/tests/test_reductions.py b/python/dask_cudf/dask_cudf/tests/test_reductions.py index c3056f2607c..88b15718382 100644 --- a/python/dask_cudf/dask_cudf/tests/test_reductions.py +++ b/python/dask_cudf/dask_cudf/tests/test_reductions.py @@ -84,3 +84,13 @@ def test_rowwise_reductions(data, op): check_exact=False, check_dtype=op not in ("var", "std"), ) + + +@pytest.mark.parametrize("skipna", [True, False]) +def test_var_nulls(skipna): + # Copied from 10min example notebook + # See: https://github.com/rapidsai/cudf/pull/15347 + s = cudf.Series([1, 2, 3, None, 4]) + ds = dask_cudf.from_cudf(s, npartitions=2) + dd.assert_eq(s.var(skipna=skipna), ds.var(skipna=skipna)) + dd.assert_eq(s.std(skipna=skipna), ds.std(skipna=skipna))