Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions xarray/core/dask_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,15 @@ def _fill_with_last_one(a, b):
axis=axis,
dtype=array.dtype,
)


def dask_array_rolling(padded, func, axis, window, min_count=None):
dtype = "f8" if not padded.dtype.kind == "f" else padded.dtype
return padded.data.map_overlap(
func,
depth={axis: (window - 1, 0)},
axis=axis,
dtype=dtype,
window=window,
min_count=min_count,
)
21 changes: 13 additions & 8 deletions xarray/core/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
from packaging.version import Version

from xarray.core import dtypes, duck_array_ops, utils
from xarray.core import dask_array_ops, dtypes, duck_array_ops, utils
from xarray.core.arithmetic import CoarsenArithmetic
from xarray.core.options import OPTIONS, _get_keep_attrs
from xarray.core.types import CoarsenBoundaryOptions, SideOptions, T_Xarray
Expand Down Expand Up @@ -597,16 +597,18 @@ def _bottleneck_reduce(self, func, keep_attrs, **kwargs):
padded = padded.pad({self.dim[0]: (0, -shift)}, mode="constant")

if is_duck_dask_array(padded.data):
raise AssertionError("should not be reachable")
values = dask_array_ops.dask_array_rolling(
padded, func, axis, self.window[0], min_count
)
else:
values = func(
padded.data, window=self.window[0], min_count=min_count, axis=axis
)
# index 0 is at the rightmost edge of the window
# need to reverse index here
# see GH #8541
if func in [bottleneck.move_argmin, bottleneck.move_argmax]:
values = self.window[0] - 1 - values
# index 0 is at the rightmost edge of the window
# need to reverse index here
# see GH #8541
if func in [bottleneck.move_argmin, bottleneck.move_argmax]:
values = self.window[0] - 1 - values

if self.center[0]:
values = values[valid]
Expand Down Expand Up @@ -669,7 +671,10 @@ def _array_reduce(
if (
OPTIONS["use_bottleneck"]
and bottleneck_move_func is not None
and not is_duck_dask_array(self.obj.data)
and (
not is_duck_dask_array(self.obj.data)
or module_available("dask", "2024.11.0")
)
and self.ndim == 1
):
# TODO: re-enable bottleneck with dask after the issues
Expand Down
3 changes: 2 additions & 1 deletion xarray/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ def test_rolling_properties(self, da) -> None:
):
da.rolling(foo=2)

@requires_dask
@pytest.mark.parametrize(
"name", ("sum", "mean", "std", "min", "max", "median", "argmin", "argmax")
)
@pytest.mark.parametrize("center", (True, False, None))
@pytest.mark.parametrize("min_periods", (1, None))
@pytest.mark.parametrize("backend", ["numpy"], indirect=True)
@pytest.mark.parametrize("backend", ["numpy", "dask"], indirect=True)
def test_rolling_wrapped_bottleneck(
self, da, name, center, min_periods, compute_backend
) -> None:
Expand Down
Loading