Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from datashader.antialias import AntialiasCombination, AntialiasStage2
from datashader.utils import isminus1, isnull
from numba import cuda as nb_cuda
from numba.typed import List

try:
from datashader.transfer_functions._cuda_utils import (
Expand Down Expand Up @@ -1868,7 +1867,16 @@ def _build_combine(self, dshape, antialias, cuda, partitioned):
invalid = isminus1 if self.selector.uses_row_index(cuda, partitioned) else isnull

@ngjit
def combine_cpu(aggs, selector_aggs):
def combine_cpu_2d(aggs, selector_aggs):
ny, nx = aggs[0].shape
for y in range(ny):
for x in range(nx):
value = selector_aggs[1][y, x]
if not invalid(value) and append(x, y, selector_aggs[0], value) >= 0:
aggs[0][y, x] = aggs[1][y, x]

@ngjit
def combine_cpu_3d(aggs, selector_aggs):
ny, nx, ncat = aggs[0].shape
for y in range(ny):
for x in range(nx):
Expand All @@ -1878,9 +1886,21 @@ def combine_cpu(aggs, selector_aggs):
aggs[0][y, x, cat] = aggs[1][y, x, cat]

@ngjit
def combine_cpu_n(aggs, selector_aggs):
# Generic solution for combining dask partitions of a where
# reduction with a selector that is a FloatingNReduction.
def combine_cpu_n_3d(aggs, selector_aggs):
ny, nx, n = aggs[0].shape
for y in range(ny):
for x in range(nx):
for i in range(n):
value = selector_aggs[1][y, x, i]
if invalid(value):
break
update_index = append(x, y, selector_aggs[0], value)
if update_index < 0:
break
shift_and_insert(aggs[0][y, x], aggs[1][y, x, i], update_index)

@ngjit
def combine_cpu_n_4d(aggs, selector_aggs):
ny, nx, ncat, n = aggs[0].shape
for y in range(ny):
for x in range(nx):
Expand Down Expand Up @@ -1955,10 +1975,9 @@ def wrapped_combine(aggs, selector_aggs):
combine_cuda_n_4d[cuda_args(aggs[0].shape[:3])](aggs, selector_aggs)
else:
if ndim == 3:
# 4d view of each agg, note use of numba typed list.
aggs = List([np.expand_dims(agg, 2) for agg in aggs])
selector_aggs = List([np.expand_dims(agg, 2) for agg in selector_aggs])
combine_cpu_n(aggs, selector_aggs)
combine_cpu_n_3d(aggs, selector_aggs)
else:
combine_cpu_n_4d(aggs, selector_aggs)
else:
# ndim is either 2 (ny, nx) or 3 (ny, nx, ncat)
if cuda:
Expand All @@ -1968,10 +1987,9 @@ def wrapped_combine(aggs, selector_aggs):
combine_cuda_3d[cuda_args(aggs[0].shape)](aggs, selector_aggs)
else:
if ndim == 2:
# 3d view of each agg, note use of numba typed list.
aggs = List([np.expand_dims(agg, 2) for agg in aggs])
selector_aggs = List([np.expand_dims(agg, 2) for agg in selector_aggs])
combine_cpu(aggs, selector_aggs)
combine_cpu_2d(aggs, selector_aggs)
else:
combine_cpu_3d(aggs, selector_aggs)

return ret

Expand Down