From 7b17557decb53bc701782e08c3aa10709b1060c7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 15 Mar 2022 14:39:42 -0700 Subject: [PATCH 01/23] basic predicate-pushdown support --- dask_sql/physical/rel/logical/filter.py | 3 +- dask_sql/physical/rel/logical/optimize.py | 275 ++++++++++++++++++++++ 2 files changed, 277 insertions(+), 1 deletion(-) create mode 100644 dask_sql/physical/rel/logical/optimize.py diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 87c99e3e0..4325b3bcf 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -6,6 +6,7 @@ from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin +from dask_sql.physical.rel.logical.optimize import predicate_pushdown from dask_sql.physical.rex import RexConverter if TYPE_CHECKING: @@ -31,7 +32,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri # In SQL, a NULL in a boolean is False on filtering filter_condition = filter_condition.fillna(False) - return df[filter_condition] + return predicate_pushdown(df[filter_condition]) class DaskFilterPlugin(BaseRelPlugin): diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py new file mode 100644 index 000000000..8e52ad5db --- /dev/null +++ b/dask_sql/physical/rel/logical/optimize.py @@ -0,0 +1,275 @@ +import operator + +import dask.dataframe as dd +import numpy as np +from dask.blockwise import Blockwise +from dask.layers import DataFrameIOLayer +from dask.utils import M, apply, is_arraylike + +_comparison_ops_and_exprs = { + operator.eq: ({"func": operator.eq}, "=="), + operator.ne: ({"func": operator.ne}, "!="), + operator.lt: ({"func": operator.lt}, "<"), + operator.le: ({"func": operator.le}, "<="), + operator.gt: ({"func": operator.gt}, ">"), + operator.ge: ({"func": operator.ge}, ">="), + np.greater: ({"func": np.greater}, ">"), + np.greater_equal: ({"func": np.greater_equal}, ">="), + np.less: ({"func": np.less}, "<"), + np.less_equal: ({"func": np.less_equal}, "<="), + np.equal: ({"func": np.equal}, "=="), + np.not_equal: ({"func": np.not_equal}, "!="), +} +_comparison_ops = {k: v[0] for k, v in _comparison_ops_and_exprs.items()} +_comparison_symbols = {k: v[1] for k, v in _comparison_ops_and_exprs.items()} +_supported_ops = { + **_comparison_ops.copy(), + operator.and_: {"func": operator.and_}, + operator.or_: {"func": operator.or_}, + operator.getitem: {"func": operator.getitem}, + M.fillna: {"func": dd.Series.fillna}, +} + + +class SimpleDispatch: + + """Simple dispatch class""" + + def __init__(self, name=None): + self._lookup = {} + if name: + self.__name__ = name + + def register(self, ref, func=None): + """Register dispatch of `func` on `ref`""" + + def wrapper(func): + if isinstance(ref, tuple): + for t in ref: + self.register(t, func) + else: + self._lookup[ref] = func + return func + + return wrapper(func) if func is not None else wrapper + + def dispatch(self, ref): + """Return the function implementation for the given ``ref``""" + lk = self._lookup + try: + return lk[ref] + except KeyError: + pass + raise TypeError(f"No dispatch for {ref}") + + def __call__(self, arg, *args, **kwargs): + """ + Call the corresponding method based on type of argument. + """ + meth = self.dispatch(arg) + return meth(arg, *args, **kwargs) + + +dnf_filter_dispatch = SimpleDispatch("dnf_filter_dispatch") + + +def _get_blockwise_input(input_index, indices, dsk): + key = indices[input_index][0] + if indices[input_index][1] is None: + return key + return dsk.layers[key]._dnf_filter_expression(dsk) + + +_inv_symbol = { + ">": "<", + "<": ">", + ">=": "<=", + "<=": ">=", +} + + +def _inv(symbol): + return _inv_symbol.get(symbol, symbol) + + +@dnf_filter_dispatch.register(tuple(_comparison_symbols.keys())) +def comparison_dnf(op, indices: list, dsk): + left = _get_blockwise_input(0, indices, dsk) + right = _get_blockwise_input(1, indices, dsk) + if is_arraylike(left) and hasattr(left, "item") and left.size == 1: + left = left.item() + return (right, _inv(_comparison_symbols[op]), left) + if is_arraylike(right) and hasattr(right, "item") and right.size == 1: + right = right.item() + return (left, _comparison_symbols[op], right) + + +def _maybe_list(val): + if isinstance(val, tuple) and val and isinstance(val[0], (tuple, list)): + return list(val) + return [val] + + +@dnf_filter_dispatch.register((operator.and_, operator.or_)) +def logical_dnf(op, indices: list, dsk): + left = _get_blockwise_input(0, indices, dsk) + right = _get_blockwise_input(1, indices, dsk) + if op == operator.or_: + return _maybe_list(left), _maybe_list(right) + elif op == operator.and_: + return (left, right) + else: + raise ValueError + + +@dnf_filter_dispatch.register(operator.getitem) +def getitem_dnf(op, indices: list, dsk): + # Return dnf of key (selected by getitem) + key = _get_blockwise_input(1, indices, dsk) + return key + + +@dnf_filter_dispatch.register(dd.Series.fillna) +def fillna_dnf(op, indices: list, dsk): + # Return dnf of input collection + return _get_blockwise_input(0, indices, dsk) + + +class RegenerableLayer: + def __init__(self, layer, creation_info): + self.layer = layer + self.creation_info = creation_info + + def _regenerate_collection( + self, dsk, new_kwargs: dict = None, _regen_cache: dict = None, + ): + + # Return regenerated layer if the work was + # already done + _regen_cache = _regen_cache or {} + if self.layer.output in _regen_cache: + return _regen_cache[self.layer.output] + + # Recursively generate necessary inputs to + # this layer to generate the collection + inputs = [] + for key, ind in self.layer.indices: + if ind is None: + if isinstance(key, (str, tuple)) and key in dsk.layers: + continue + inputs.append(key) + elif key in self.layer.io_deps: + continue + else: + inputs.append( + dsk.layers[key]._regenerate_collection( + dsk, new_kwargs=new_kwargs, _regen_cache=_regen_cache, + ) + ) + + # Extract the callable func and key-word args. + # Then return a regenerated collection + func = self.creation_info.get("func", None) + if func is None: + raise ValueError( + "`_regenerate_collection` failed. " + "Not all HLG layers are regenerable." + ) + regen_args = self.creation_info.get("args", []) + regen_kwargs = self.creation_info.get("kwargs", {}).copy() + regen_kwargs = {k: v for k, v in self.creation_info.get("kwargs", {}).items()} + regen_kwargs.update((new_kwargs or {}).get(self.layer.output, {})) + result = func(*inputs, *regen_args, **regen_kwargs) + _regen_cache[self.layer.output] = result + return result + + def _dnf_filter_expression(self, dsk): + """Return a DNF-formatted filter expression for the + graph terminating at this layer + """ + return dnf_filter_dispatch(self.creation_info["func"], self.layer.indices, dsk,) + + +class RegenerableGraph: + def __init__(self, layers, dependencies, dependents): + self.layers = layers + self.dependencies = dependencies + self.dependents = dependents + + @classmethod + def from_hlg(cls, graph): + _layers = {} + for key, layer in graph.layers.items(): + regenerable_layer = None + if isinstance(layer, DataFrameIOLayer): + regenerable_layer = RegenerableLayer(layer, layer.creation_info or {},) + elif isinstance(layer, Blockwise): + tasks = list(layer.dsk.values()) + if len(tasks) == 1 and tasks[0]: + if tasks[0][0] == apply: + creation_info = _supported_ops.get(tasks[0][1], None) + else: + creation_info = _supported_ops.get(tasks[0][0], None) + if creation_info: + regenerable_layer = RegenerableLayer(layer, creation_info) + + if regenerable_layer is None: + raise ValueError(f"Graph contains non-regenerable layer: {layer}") + + _layers[key] = regenerable_layer + + return RegenerableGraph( + _layers, graph.dependencies.copy(), graph.dependents.copy(), + ) + + +def predicate_pushdown(ddf): + + # Get output layer name and HLG + name = ddf._name + + # Start by converting the HLG to RegenerableGraph + try: + dsk = RegenerableGraph.from_hlg(ddf.dask) + except ValueError: + return ddf + + # Extract filters + try: + filters = dsk.layers[name]._dnf_filter_expression(dsk) + if filters: + if isinstance(filters[0], (list, tuple)): + filters = list(filters) + else: + filters = [filters] + else: + return ddf + if not isinstance(filters, list): + filters = [filters] + except ImportError: # (TypeError, ValueError): + # DNF dispatching failed for 1+ layers + return ddf + + # We were able to extract a DNF filter expression. + # Check that all layers are regenerable, and that + # the graph contains an IO layer with filters support. + # All layers besides the root IO layer should also + # support DNF dispatching. Otherwise, there could be + # something like column-assignment or data manipulation + # between the IO layer and the filter. + io_layer = [] + for k, v in dsk.layers.items(): + if ( + isinstance(v.layer, DataFrameIOLayer) + and "filters" in v.creation_info.get("kwargs", {}) + and v.creation_info["kwargs"]["filters"] is None + ): + io_layer.append(k) + if len(io_layer) != 1: + return ddf + io_layer = io_layer.pop() + + # Regenerate collection with filtered IO layer + return dsk.layers[name]._regenerate_collection( + dsk, new_kwargs={io_layer: {"filters": filters}}, + ) From b5cb2cb85f1be5779d38a7f01b9c3bc8af8f4ea9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 15 Mar 2022 16:09:20 -0700 Subject: [PATCH 02/23] remove explict Dispatch class --- dask_sql/physical/rel/logical/optimize.py | 353 +++++++++++----------- 1 file changed, 181 insertions(+), 172 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 8e52ad5db..e8ac1bace 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -3,139 +3,111 @@ import dask.dataframe as dd import numpy as np from dask.blockwise import Blockwise +from dask.highlevelgraph import HighLevelGraph from dask.layers import DataFrameIOLayer from dask.utils import M, apply, is_arraylike -_comparison_ops_and_exprs = { - operator.eq: ({"func": operator.eq}, "=="), - operator.ne: ({"func": operator.ne}, "!="), - operator.lt: ({"func": operator.lt}, "<"), - operator.le: ({"func": operator.le}, "<="), - operator.gt: ({"func": operator.gt}, ">"), - operator.ge: ({"func": operator.ge}, ">="), - np.greater: ({"func": np.greater}, ">"), - np.greater_equal: ({"func": np.greater_equal}, ">="), - np.less: ({"func": np.less}, "<"), - np.less_equal: ({"func": np.less_equal}, "<="), - np.equal: ({"func": np.equal}, "=="), - np.not_equal: ({"func": np.not_equal}, "!="), -} -_comparison_ops = {k: v[0] for k, v in _comparison_ops_and_exprs.items()} -_comparison_symbols = {k: v[1] for k, v in _comparison_ops_and_exprs.items()} -_supported_ops = { - **_comparison_ops.copy(), - operator.and_: {"func": operator.and_}, - operator.or_: {"func": operator.or_}, - operator.getitem: {"func": operator.getitem}, - M.fillna: {"func": dd.Series.fillna}, -} - -class SimpleDispatch: +def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: + """Use graph information to update IO-level filters - """Simple dispatch class""" + This is a special optimization that must be called + eagerly on a DataFrame collection when filters are + applied. The "eager" requirement for this optimization + is due to the fact that `npartitions` and `divisions` + may change when this optimization is applied (invalidating + npartition/divisions-specific logic in following Layers). + """ - def __init__(self, name=None): - self._lookup = {} - if name: - self.__name__ = name + # Get output layer name and HLG + name = ddf._name - def register(self, ref, func=None): - """Register dispatch of `func` on `ref`""" + # Start by converting the HLG to a `RegenerableGraph`. + # Succeeding here means that all layers in the graph + # are regenerable. + try: + dsk = RegenerableGraph.from_hlg(ddf.dask) + except ValueError: + return ddf - def wrapper(func): - if isinstance(ref, tuple): - for t in ref: - self.register(t, func) + # Extract a DNF-formatted filter expression + try: + filters = dsk.layers[name]._dnf_filter_expression(dsk) + if filters: + if isinstance(filters[0], (list, tuple)): + filters = list(filters) else: - self._lookup[ref] = func - return func - - return wrapper(func) if func is not None else wrapper - - def dispatch(self, ref): - """Return the function implementation for the given ``ref``""" - lk = self._lookup - try: - return lk[ref] - except KeyError: - pass - raise TypeError(f"No dispatch for {ref}") - - def __call__(self, arg, *args, **kwargs): - """ - Call the corresponding method based on type of argument. - """ - meth = self.dispatch(arg) - return meth(arg, *args, **kwargs) - - -dnf_filter_dispatch = SimpleDispatch("dnf_filter_dispatch") + filters = [filters] + else: + return ddf + if not isinstance(filters, list): + filters = [filters] + except ValueError: + # DNF dispatching failed for 1+ layers + return ddf + # We were able to extract a DNF filter expression. + # Check that we have an IO layer with filters support + io_layer = [] + for k, v in dsk.layers.items(): + if ( + isinstance(v.layer, DataFrameIOLayer) + and "filters" in v.creation_info.get("kwargs", {}) + and v.creation_info["kwargs"]["filters"] is None + ): + io_layer.append(k) + if len(io_layer) != 1: + return ddf + io_layer = io_layer.pop() -def _get_blockwise_input(input_index, indices, dsk): - key = indices[input_index][0] - if indices[input_index][1] is None: - return key - return dsk.layers[key]._dnf_filter_expression(dsk) + # Regenerate collection with filtered IO layer + return dsk.layers[name]._regenerate_collection( + dsk, new_kwargs={io_layer: {"filters": filters}}, + ) -_inv_symbol = { - ">": "<", - "<": ">", - ">=": "<=", - "<=": ">=", +# Define all supported comparison functions +# (and their mapping to a string expression) +_comparison_symbols = { + operator.eq: "==", + operator.ne: "!=", + operator.lt: "<", + operator.le: "<=", + operator.gt: ">", + operator.ge: ">=", + np.greater: ">", + np.greater_equal: ">=", + np.less: "<", + np.less_equal: "<=", + np.equal: "==", + np.not_equal: "!=", } +# Define set of all "regenerable" operations. +# Predicate pushdown is supported for graphs +# comprised of `Blockwise` layers based on these +# operations +_regenerable_ops = set(_comparison_symbols.keys()) | { + operator.and_, + operator.or_, + operator.getitem, + M.fillna, +} -def _inv(symbol): - return _inv_symbol.get(symbol, symbol) - - -@dnf_filter_dispatch.register(tuple(_comparison_symbols.keys())) -def comparison_dnf(op, indices: list, dsk): - left = _get_blockwise_input(0, indices, dsk) - right = _get_blockwise_input(1, indices, dsk) - if is_arraylike(left) and hasattr(left, "item") and left.size == 1: - left = left.item() - return (right, _inv(_comparison_symbols[op]), left) - if is_arraylike(right) and hasattr(right, "item") and right.size == 1: - right = right.item() - return (left, _comparison_symbols[op], right) - - -def _maybe_list(val): - if isinstance(val, tuple) and val and isinstance(val[0], (tuple, list)): - return list(val) - return [val] - - -@dnf_filter_dispatch.register((operator.and_, operator.or_)) -def logical_dnf(op, indices: list, dsk): - left = _get_blockwise_input(0, indices, dsk) - right = _get_blockwise_input(1, indices, dsk) - if op == operator.or_: - return _maybe_list(left), _maybe_list(right) - elif op == operator.and_: - return (left, right) - else: - raise ValueError - +# Specify functions that must be generated with +# a different API at the dataframe-collection level +_special_op_mappings = {M.fillna: dd.Series.fillna} -@dnf_filter_dispatch.register(operator.getitem) -def getitem_dnf(op, indices: list, dsk): - # Return dnf of key (selected by getitem) - key = _get_blockwise_input(1, indices, dsk) - return key +class RegenerableLayer: + """Regenerable Layer -@dnf_filter_dispatch.register(dd.Series.fillna) -def fillna_dnf(op, indices: list, dsk): - # Return dnf of input collection - return _get_blockwise_input(0, indices, dsk) - + Wraps ``dask.highlevelgraph.Layer`` to ensure that a + ``creation_info`` attribute is defined. This class + also defines the necessary methods for recursive + layer regeneration and filter-expression generation. + """ -class RegenerableLayer: def __init__(self, layer, creation_info): self.layer = layer self.creation_info = creation_info @@ -143,6 +115,9 @@ def __init__(self, layer, creation_info): def _regenerate_collection( self, dsk, new_kwargs: dict = None, _regen_cache: dict = None, ): + """Regenerate a Dask collection for this layer using the + provided inputs and key-word arguments + """ # Return regenerated layer if the work was # already done @@ -187,89 +162,123 @@ def _dnf_filter_expression(self, dsk): """Return a DNF-formatted filter expression for the graph terminating at this layer """ - return dnf_filter_dispatch(self.creation_info["func"], self.layer.indices, dsk,) + op = self.creation_info["func"] + if op in _comparison_symbols.keys(): + func = _comparison_dnf + elif op in (operator.and_, operator.or_): + func = _logical_dnf + elif op == operator.getitem: + func = _getitem_dnf + elif op == dd.Series.fillna: + func = _fillna_dnf + else: + raise ValueError(f"No DNF expression for {op}") + + return func(op, self.layer.indices, dsk) class RegenerableGraph: - def __init__(self, layers, dependencies, dependents): + """Regenerable Graph + + This class is similar to ``dask.highlevelgraph.HighLevelGraph``. + However, all layers in a ``RegenerableGraph`` graph must be + ``RegenerableLayer`` objects. + """ + + def __init__(self, layers: dict): self.layers = layers - self.dependencies = dependencies - self.dependents = dependents @classmethod - def from_hlg(cls, graph): + def from_hlg(cls, hlg: HighLevelGraph): + """Construct a ``RegenerableGraph`` from a ``HighLevelGraph``""" + + if not isinstance(hlg, HighLevelGraph): + raise TypeError(f"Expected HighLevelGraph, got {type(hlg)}") + _layers = {} - for key, layer in graph.layers.items(): + for key, layer in hlg.layers.items(): regenerable_layer = None if isinstance(layer, DataFrameIOLayer): - regenerable_layer = RegenerableLayer(layer, layer.creation_info or {},) + regenerable_layer = RegenerableLayer(layer, layer.creation_info or {}) elif isinstance(layer, Blockwise): tasks = list(layer.dsk.values()) if len(tasks) == 1 and tasks[0]: + kwargs = {} if tasks[0][0] == apply: - creation_info = _supported_ops.get(tasks[0][1], None) + op = tasks[0][1] + options = tasks[0][3] + if isinstance(options, dict): + kwargs = options + elif ( + isinstance(options, tuple) + and options + and callable(options[0]) + ): + kwargs = options[0](*options[1:]) else: - creation_info = _supported_ops.get(tasks[0][0], None) - if creation_info: - regenerable_layer = RegenerableLayer(layer, creation_info) + op = tasks[0][0] + if op in _regenerable_ops: + regenerable_layer = RegenerableLayer( + layer, + { + "func": _special_op_mappings.get(op, op), + "kwargs": kwargs, + }, + ) if regenerable_layer is None: raise ValueError(f"Graph contains non-regenerable layer: {layer}") _layers[key] = regenerable_layer - return RegenerableGraph( - _layers, graph.dependencies.copy(), graph.dependents.copy(), - ) + return RegenerableGraph(_layers) -def predicate_pushdown(ddf): +def _get_blockwise_input(input_index, indices: list, dsk: RegenerableGraph): + key = indices[input_index][0] + if indices[input_index][1] is None: + return key + return dsk.layers[key]._dnf_filter_expression(dsk) - # Get output layer name and HLG - name = ddf._name - # Start by converting the HLG to RegenerableGraph - try: - dsk = RegenerableGraph.from_hlg(ddf.dask) - except ValueError: - return ddf +def _comparison_dnf(op, indices: list, dsk: RegenerableGraph): + left = _get_blockwise_input(0, indices, dsk) + right = _get_blockwise_input(1, indices, dsk) - # Extract filters - try: - filters = dsk.layers[name]._dnf_filter_expression(dsk) - if filters: - if isinstance(filters[0], (list, tuple)): - filters = list(filters) - else: - filters = [filters] - else: - return ddf - if not isinstance(filters, list): - filters = [filters] - except ImportError: # (TypeError, ValueError): - # DNF dispatching failed for 1+ layers - return ddf + def _inv(symbol: str): + return {">": "<", "<": ">", ">=": "<=", "<=": ">=",}.get(symbol, symbol) - # We were able to extract a DNF filter expression. - # Check that all layers are regenerable, and that - # the graph contains an IO layer with filters support. - # All layers besides the root IO layer should also - # support DNF dispatching. Otherwise, there could be - # something like column-assignment or data manipulation - # between the IO layer and the filter. - io_layer = [] - for k, v in dsk.layers.items(): - if ( - isinstance(v.layer, DataFrameIOLayer) - and "filters" in v.creation_info.get("kwargs", {}) - and v.creation_info["kwargs"]["filters"] is None - ): - io_layer.append(k) - if len(io_layer) != 1: - return ddf - io_layer = io_layer.pop() + if is_arraylike(left) and hasattr(left, "item") and left.size == 1: + left = left.item() + return (right, _inv(_comparison_symbols[op]), left) + if is_arraylike(right) and hasattr(right, "item") and right.size == 1: + right = right.item() + return (left, _comparison_symbols[op], right) - # Regenerate collection with filtered IO layer - return dsk.layers[name]._regenerate_collection( - dsk, new_kwargs={io_layer: {"filters": filters}}, - ) + +def _logical_dnf(op, indices: list, dsk: RegenerableGraph): + left = _get_blockwise_input(0, indices, dsk) + right = _get_blockwise_input(1, indices, dsk) + + def _maybe_list(val): + if isinstance(val, tuple) and val and isinstance(val[0], (tuple, list)): + return list(val) + return [val] + + if op == operator.or_: + return _maybe_list(left), _maybe_list(right) + elif op == operator.and_: + return (left, right) + else: + raise ValueError + + +def _getitem_dnf(op, indices: list, dsk: RegenerableGraph): + # Return dnf of key (selected by getitem) + key = _get_blockwise_input(1, indices, dsk) + return key + + +def _fillna_dnf(op, indices: list, dsk: RegenerableGraph): + # Return dnf of input collection + return _get_blockwise_input(0, indices, dsk) From 017f65eafd1c833f7df7ecdd25b6e5419488f18a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 15 Mar 2022 16:18:00 -0700 Subject: [PATCH 03/23] use _Frame.fillna --- dask_sql/physical/rel/logical/optimize.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index e8ac1bace..1754ef81e 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -27,7 +27,7 @@ def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: # are regenerable. try: dsk = RegenerableGraph.from_hlg(ddf.dask) - except ValueError: + except (ValueError, TypeError): return ddf # Extract a DNF-formatted filter expression @@ -96,7 +96,7 @@ def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: # Specify functions that must be generated with # a different API at the dataframe-collection level -_special_op_mappings = {M.fillna: dd.Series.fillna} +_special_op_mappings = {M.fillna: dd._Frame.fillna} class RegenerableLayer: @@ -169,7 +169,7 @@ def _dnf_filter_expression(self, dsk): func = _logical_dnf elif op == operator.getitem: func = _getitem_dnf - elif op == dd.Series.fillna: + elif op == dd._Frame.fillna: func = _fillna_dnf else: raise ValueError(f"No DNF expression for {op}") From e08f6cf8f3ba50e5939ba11c1ad470dacecc6094 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 06:40:18 -0700 Subject: [PATCH 04/23] cleanup comments --- dask_sql/physical/rel/logical/optimize.py | 42 ++++++++++++++--------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 1754ef81e..0b02347f7 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -47,16 +47,19 @@ def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: return ddf # We were able to extract a DNF filter expression. - # Check that we have an IO layer with filters support + # Check that we have a single IO layer with `filters` support io_layer = [] for k, v in dsk.layers.items(): - if ( - isinstance(v.layer, DataFrameIOLayer) - and "filters" in v.creation_info.get("kwargs", {}) - and v.creation_info["kwargs"]["filters"] is None - ): + if isinstance(v.layer, DataFrameIOLayer): io_layer.append(k) + if ( + "filters" not in v.creation_info.get("kwargs", {}) + or v.creation_info["kwargs"]["filters"] is not None + ): + # No filters support, or filters is already set + return ddf if len(io_layer) != 1: + # Not a single IO layer return ddf io_layer = io_layer.pop() @@ -102,14 +105,14 @@ def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: class RegenerableLayer: """Regenerable Layer - Wraps ``dask.highlevelgraph.Layer`` to ensure that a + Wraps ``dask.highlevelgraph.Blockwise`` to ensure that a ``creation_info`` attribute is defined. This class also defines the necessary methods for recursive layer regeneration and filter-expression generation. """ def __init__(self, layer, creation_info): - self.layer = layer + self.layer = layer # Original Blockwise layer reference self.creation_info = creation_info def _regenerate_collection( @@ -164,13 +167,13 @@ def _dnf_filter_expression(self, dsk): """ op = self.creation_info["func"] if op in _comparison_symbols.keys(): - func = _comparison_dnf + func = _blockwise_comparison_dnf elif op in (operator.and_, operator.or_): - func = _logical_dnf + func = _blockwise_logical_dnf elif op == operator.getitem: - func = _getitem_dnf + func = _blockwise_getitem_dnf elif op == dd._Frame.fillna: - func = _fillna_dnf + func = _blockwise_fillna_dnf else: raise ValueError(f"No DNF expression for {op}") @@ -182,7 +185,7 @@ class RegenerableGraph: This class is similar to ``dask.highlevelgraph.HighLevelGraph``. However, all layers in a ``RegenerableGraph`` graph must be - ``RegenerableLayer`` objects. + ``RegenerableLayer`` objects (which wrap ``Blockwise`` layers). """ def __init__(self, layers: dict): @@ -235,13 +238,16 @@ def from_hlg(cls, hlg: HighLevelGraph): def _get_blockwise_input(input_index, indices: list, dsk: RegenerableGraph): + # Simple utility to get the required input expressions + # for a Blockwise layer (using indices) key = indices[input_index][0] if indices[input_index][1] is None: return key return dsk.layers[key]._dnf_filter_expression(dsk) -def _comparison_dnf(op, indices: list, dsk: RegenerableGraph): +def _blockwise_comparison_dnf(op, indices: list, dsk: RegenerableGraph): + # Return DNF expression pattern for a simple comparison left = _get_blockwise_input(0, indices, dsk) right = _get_blockwise_input(1, indices, dsk) @@ -250,13 +256,15 @@ def _inv(symbol: str): if is_arraylike(left) and hasattr(left, "item") and left.size == 1: left = left.item() + # Need inverse comparison in read_parquet return (right, _inv(_comparison_symbols[op]), left) if is_arraylike(right) and hasattr(right, "item") and right.size == 1: right = right.item() return (left, _comparison_symbols[op], right) -def _logical_dnf(op, indices: list, dsk: RegenerableGraph): +def _blockwise_logical_dnf(op, indices: list, dsk: RegenerableGraph): + # Return DNF expression pattern for logical "and" or "or" left = _get_blockwise_input(0, indices, dsk) right = _get_blockwise_input(1, indices, dsk) @@ -273,12 +281,12 @@ def _maybe_list(val): raise ValueError -def _getitem_dnf(op, indices: list, dsk: RegenerableGraph): +def _blockwise_getitem_dnf(op, indices: list, dsk: RegenerableGraph): # Return dnf of key (selected by getitem) key = _get_blockwise_input(1, indices, dsk) return key -def _fillna_dnf(op, indices: list, dsk: RegenerableGraph): +def _blockwise_fillna_dnf(op, indices: list, dsk: RegenerableGraph): # Return dnf of input collection return _get_blockwise_input(0, indices, dsk) From f63b814971565c8ffc3f492e5e006f68cd860d47 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 07:32:36 -0700 Subject: [PATCH 05/23] test coverage --- dask_sql/physical/rel/logical/optimize.py | 33 +++++++++++++++++++---- tests/integration/test_filter.py | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 0b02347f7..539d98668 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -1,4 +1,5 @@ import operator +import warnings import dask.dataframe as dd import numpy as np @@ -64,9 +65,21 @@ def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: io_layer = io_layer.pop() # Regenerate collection with filtered IO layer - return dsk.layers[name]._regenerate_collection( - dsk, new_kwargs={io_layer: {"filters": filters}}, - ) + try: + return dsk.layers[name]._regenerate_collection( + dsk, new_kwargs={io_layer: {"filters": filters}}, + ) + except ValueError as err: + # Most-likely failed to apply filters in read_parquet. + # We can just bail on predicate pushdown, but we also + # raise a warning to encourage the user to file an issue. + warnings.warn( + f"Predicate pushdown failed. Please open a bug report at " + f"https://github.com/dask-contrib/dask-sql/issues/new/choose " + f"and include the following error message: {err}" + ) + + return ddf # Define all supported comparison functions @@ -273,10 +286,20 @@ def _maybe_list(val): return list(val) return [val] + def _maybe_tuple(val): + if isinstance(val, tuple) and val and isinstance(val[0], tuple): + return val + return (val,) + if op == operator.or_: - return _maybe_list(left), _maybe_list(right) + # NDF "or" is List[List[Tuple]] + return [_maybe_list(left), _maybe_list(right)] elif op == operator.and_: - return (left, right) + # NDF "and" is List[Tuple] + # However, we don't want to add the outer list + # until the filter is finished, or this expression + # is combined with another in an "or" expression + return _maybe_tuple(left) + _maybe_tuple(right) else: raise ValueError diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index ad98d4416..3879d61ce 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -1,8 +1,10 @@ import dask.dataframe as dd import pandas as pd import pytest +from dask.utils_test import hlg_layer from pandas.testing import assert_frame_equal +from dask_sql import Context from dask_sql._compat import INT_NAN_IMPLEMENTED @@ -122,3 +124,29 @@ def test_filter_year(c): expected_df = df[df["year"] < 2016] assert_frame_equal(expected_df, actual_df) + + +def test_predicate_pushdown_complicated(tmpdir): + + # Write simple parquet dataset + dd.from_pandas( + pd.DataFrame({"a": [1, 2, 3] * 5, "b": range(15), "c": ["A"] * 15}), + npartitions=3, + ).to_parquet(tmpdir) + + # Read back with dask and apply WHERE query + ddf = dd.read_parquet(tmpdir) + df = ddf.compute() + + context = Context() + context.create_table("my_table", ddf) + return_df = context.sql("SELECT * FROM my_table WHERE a < 3 AND (b > 1 AND b < 3)") + + # Check for predicate pushdown + assert hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"]["filters"] + return_df = return_df.compute() + + expected_df = df[((df["a"] < 3) & ((df["b"] > 1) & (df["b"] < 3)))] + assert_frame_equal( + return_df, expected_df, + ) From 4b1bc97ca14b86cbfa84dfa010ee057cd0c03e80 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 08:15:54 -0700 Subject: [PATCH 06/23] improve test coverage --- tests/integration/fixtures.py | 29 +++++++++++++++++- tests/integration/test_filter.py | 50 +++++++++++++++++++------------- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index ed4aa13e1..5d4555e77 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -112,6 +112,27 @@ def datetime_table(): ) +@pytest.fixture() +def parquet_ddf(tmpdir): + + # Write simple parquet dataset + dd.from_pandas( + pd.DataFrame( + { + "a": [1, 2, 3] * 5, + "b": range(15), + "c": ["A"] * 15, + "d": [2001, 2002, 2003] * 5, + "index": range(15), + }, + ), + npartitions=3, + ).to_parquet(tmpdir) + + # Read back with dask and apply WHERE query + return dd.read_parquet(tmpdir, index="index") + + @pytest.fixture() def gpu_user_table_1(user_table_1): return cudf.from_pandas(user_table_1) if cudf else None @@ -149,6 +170,7 @@ def c( user_table_nan, string_table, datetime_table, + parquet_ddf, gpu_user_table_1, gpu_df, gpu_long_table, @@ -166,6 +188,7 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, + "parquet_ddf": parquet_ddf, "gpu_user_table_1": gpu_user_table_1, "gpu_df": gpu_df, "gpu_long_table": gpu_long_table, @@ -180,7 +203,11 @@ def c( for df_name, df in dfs.items(): if df is None: continue - dask_df = dd.from_pandas(df, npartitions=3) + if hasattr(df, "npartitions"): + # df is already a dask collection + dask_df = df + else: + dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) yield c diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 3879d61ce..12821f81b 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -4,7 +4,6 @@ from dask.utils_test import hlg_layer from pandas.testing import assert_frame_equal -from dask_sql import Context from dask_sql._compat import INT_NAN_IMPLEMENTED @@ -126,27 +125,38 @@ def test_filter_year(c): assert_frame_equal(expected_df, actual_df) -def test_predicate_pushdown_complicated(tmpdir): - - # Write simple parquet dataset - dd.from_pandas( - pd.DataFrame({"a": [1, 2, 3] * 5, "b": range(15), "c": ["A"] * 15}), - npartitions=3, - ).to_parquet(tmpdir) - - # Read back with dask and apply WHERE query - ddf = dd.read_parquet(tmpdir) - df = ddf.compute() - - context = Context() - context.create_table("my_table", ddf) - return_df = context.sql("SELECT * FROM my_table WHERE a < 3 AND (b > 1 AND b < 3)") +@pytest.mark.parametrize( + "query,df_func", + [ + ("SELECT * FROM parquet_ddf WHERE b < 10", lambda x: x[x["b"] < 10],), + ( + "SELECT * FROM parquet_ddf WHERE a < 3 AND (b > 1 AND b < 5)", + lambda x: x[(x["a"] < 3) & ((x["b"] > 1) & (x["b"] < 5))], + ), + ( + "SELECT * FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", + lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)], + ), + ( + "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", + lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], + ), + ], +) +def test_predicate_pushdown_complicated(c, parquet_ddf, query, df_func): - # Check for predicate pushdown + # Check for predicate pushdown. + # We can use the `hlg_layer` utility to make sure the + # `filters` field has been populated in `creation_info` + return_df = c.sql(query) assert hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"]["filters"] - return_df = return_df.compute() - expected_df = df[((df["a"] < 3) & ((df["b"] > 1) & (df["b"] < 3)))] + # Check computed result is correct. + # Note that we must sort by index to ensure + # ordering can be compared directly + return_df = return_df.compute() + df = parquet_ddf.compute() + expected_df = df_func(df) assert_frame_equal( - return_df, expected_df, + return_df.sort_index(), expected_df.sort_index(), ) From 7f78c5843b15e0c4ef9669a7e7966aedc7bec239 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 09:55:39 -0700 Subject: [PATCH 07/23] add xfail test for dt accessor in predicate and fix test_show.py --- tests/integration/fixtures.py | 27 +++++++++++++++------------ tests/integration/test_filter.py | 9 ++++++++- tests/integration/test_show.py | 2 ++ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 5d4555e77..766ac87ab 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -116,18 +116,21 @@ def datetime_table(): def parquet_ddf(tmpdir): # Write simple parquet dataset - dd.from_pandas( - pd.DataFrame( - { - "a": [1, 2, 3] * 5, - "b": range(15), - "c": ["A"] * 15, - "d": [2001, 2002, 2003] * 5, - "index": range(15), - }, - ), - npartitions=3, - ).to_parquet(tmpdir) + df = pd.DataFrame( + { + "a": [1, 2, 3] * 5, + "b": range(15), + "c": ["A"] * 15, + "d": [ + pd.Timestamp("2013-08-01 23:00:00"), + pd.Timestamp("2014-09-01 23:00:00"), + pd.Timestamp("2015-10-01 23:00:00"), + ] + * 5, + "index": range(15), + }, + ) + dd.from_pandas(df, npartitions=3).to_parquet(tmpdir) # Read back with dask and apply WHERE query return dd.read_parquet(tmpdir, index="index") diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 12821f81b..d010ed6b4 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -128,7 +128,7 @@ def test_filter_year(c): @pytest.mark.parametrize( "query,df_func", [ - ("SELECT * FROM parquet_ddf WHERE b < 10", lambda x: x[x["b"] < 10],), + ("SELECT * FROM parquet_ddf WHERE b < 10", lambda x: x[x["b"] < 10]), ( "SELECT * FROM parquet_ddf WHERE a < 3 AND (b > 1 AND b < 5)", lambda x: x[(x["a"] < 3) & ((x["b"] > 1) & (x["b"] < 5))], @@ -141,6 +141,13 @@ def test_filter_year(c): "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], ), + pytest.param( + "SELECT * FROM parquet_ddf WHERE year(d) < 2015", + lambda x: x[x["d"].dt.year < 2015], + marks=pytest.mark.xfail( + reason="Predicate pushdown does not support datetime accessors." + ), + ), ], ) def test_predicate_pushdown_complicated(c, parquet_ddf, query, df_func): diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index a04129489..41e315a95 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -43,6 +43,7 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", + "parquet_ddf", ] if cudf is None else [ @@ -56,6 +57,7 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", + "parquet_ddf", "gpu_user_table_1", "gpu_df", "gpu_long_table", From 60f91499807a78a936210c85b9cb316b3192d53c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 10:02:39 -0700 Subject: [PATCH 08/23] fix some naming issues --- dask_sql/physical/rel/logical/filter.py | 4 ++-- dask_sql/physical/rel/logical/optimize.py | 5 ++++- tests/integration/test_filter.py | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 4325b3bcf..2ae183bd7 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -6,7 +6,7 @@ from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin -from dask_sql.physical.rel.logical.optimize import predicate_pushdown +from dask_sql.physical.rel.logical.optimize import attempt_predicate_pushdown from dask_sql.physical.rex import RexConverter if TYPE_CHECKING: @@ -32,7 +32,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri # In SQL, a NULL in a boolean is False on filtering filter_condition = filter_condition.fillna(False) - return predicate_pushdown(df[filter_condition]) + return attempt_predicate_pushdown(df[filter_condition]) class DaskFilterPlugin(BaseRelPlugin): diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 539d98668..26033390e 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -9,9 +9,12 @@ from dask.utils import M, apply, is_arraylike -def predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: +def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: """Use graph information to update IO-level filters + The original `ddf` will be returned if/when the + predicate-pushdown optimization fails. + This is a special optimization that must be called eagerly on a DataFrame collection when filters are applied. The "eager" requirement for this optimization diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index d010ed6b4..c9d5840a3 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -144,13 +144,15 @@ def test_filter_year(c): pytest.param( "SELECT * FROM parquet_ddf WHERE year(d) < 2015", lambda x: x[x["d"].dt.year < 2015], + # This test will fail, because the filters will not get pushed + # down to read_parquet. However, the query should still succeed. marks=pytest.mark.xfail( reason="Predicate pushdown does not support datetime accessors." ), ), ], ) -def test_predicate_pushdown_complicated(c, parquet_ddf, query, df_func): +def test_predicate_pushdown(c, parquet_ddf, query, df_func): # Check for predicate pushdown. # We can use the `hlg_layer` utility to make sure the From 5d9b3696a0098de08f8c42ce9aeb9300b4942f06 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 11:31:21 -0700 Subject: [PATCH 09/23] add config and use assert_eq --- dask_sql/physical/rel/logical/filter.py | 7 ++++++- dask_sql/sql-schema.yaml | 5 +++++ dask_sql/sql.yaml | 2 ++ tests/integration/test_filter.py | 28 ++++++++++--------------- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 2ae183bd7..3ada797c6 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING, Union +import dask.config as dask_config import dask.dataframe as dd import numpy as np @@ -32,7 +33,11 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri # In SQL, a NULL in a boolean is False on filtering filter_condition = filter_condition.fillna(False) - return attempt_predicate_pushdown(df[filter_condition]) + out = df[filter_condition] + if dask_config.get("sql.predicate_pushdown"): + return attempt_predicate_pushdown(out) + else: + return out class DaskFilterPlugin(BaseRelPlugin): diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 06c766854..f65e4d344 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -26,3 +26,8 @@ properties: type: boolean description: | Whether sql identifiers are considered case sensitive while parsing. + + predicate_pushdown: + type: bool + description: | + Whether to try pushing down filter predicates into IO (when possible). diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 1976e72c3..72f28c271 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -5,3 +5,5 @@ sql: identifier: case_sensitive: True + + predicate_pushdown: True diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index c9d5840a3..e3bc2ea60 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -2,7 +2,6 @@ import pandas as pd import pytest from dask.utils_test import hlg_layer -from pandas.testing import assert_frame_equal from dask_sql._compat import INT_NAN_IMPLEMENTED @@ -12,7 +11,7 @@ def test_filter(c, df): return_df = return_df.compute() expected_df = df[df["a"] < 2] - assert_frame_equal(return_df, expected_df) + dd.assert_eq(return_df, expected_df) def test_filter_scalar(c, df): @@ -20,25 +19,25 @@ def test_filter_scalar(c, df): return_df = return_df.compute() expected_df = df - assert_frame_equal(return_df, expected_df) + dd.assert_eq(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE False") return_df = return_df.compute() expected_df = df.head(0) - assert_frame_equal(return_df, expected_df, check_index_type=False) + dd.assert_eq(return_df, expected_df, check_index_type=False) return_df = c.sql("SELECT * FROM df WHERE (1 = 1)") return_df = return_df.compute() expected_df = df - assert_frame_equal(return_df, expected_df) + dd.assert_eq(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE (1 = 0)") return_df = return_df.compute() expected_df = df.head(0) - assert_frame_equal(return_df, expected_df, check_index_type=False) + dd.assert_eq(return_df, expected_df, check_index_type=False) def test_filter_complicated(c, df): @@ -46,7 +45,7 @@ def test_filter_complicated(c, df): return_df = return_df.compute() expected_df = df[((df["a"] < 3) & ((df["b"] > 1) & (df["b"] < 3)))] - assert_frame_equal( + dd.assert_eq( return_df, expected_df, ) @@ -59,7 +58,7 @@ def test_filter_with_nan(c): expected_df = pd.DataFrame({"c": [3]}, dtype="Int8") else: expected_df = pd.DataFrame({"c": [3]}, dtype="float") - assert_frame_equal( + dd.assert_eq( return_df, expected_df, ) @@ -68,7 +67,7 @@ def test_string_filter(c, string_table): return_df = c.sql("SELECT * FROM string_table WHERE a = 'a normal string'") return_df = return_df.compute() - assert_frame_equal( + dd.assert_eq( return_df, string_table.head(1), ) @@ -122,7 +121,7 @@ def test_filter_year(c): actual_df = c.sql("select * from datetime_test where year(dt) < 2016").compute() expected_df = df[df["year"] < 2016] - assert_frame_equal(expected_df, actual_df) + dd.assert_eq(expected_df, actual_df) @pytest.mark.parametrize( @@ -160,12 +159,7 @@ def test_predicate_pushdown(c, parquet_ddf, query, df_func): return_df = c.sql(query) assert hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"]["filters"] - # Check computed result is correct. - # Note that we must sort by index to ensure - # ordering can be compared directly - return_df = return_df.compute() + # Check computed result is correct df = parquet_ddf.compute() expected_df = df_func(df) - assert_frame_equal( - return_df.sort_index(), expected_df.sort_index(), - ) + dd.assert_eq(return_df, expected_df) From 6951a1dfad0e00ccbd94559ea9d931b950cf797a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 16 Mar 2022 11:46:27 -0700 Subject: [PATCH 10/23] add logging events when predicate-pushdown bails --- dask_sql/physical/rel/logical/filter.py | 2 +- dask_sql/physical/rel/logical/optimize.py | 26 ++++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 3ada797c6..ef932db28 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -19,7 +19,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Series]): """ - Some (complex) SQL queries can lead to a strange condition which is always true or false. + Some (complex) SQL queries calogger = logging.getLogger(__name__)n lead to a strange condition which is always true or false. We do not need to filter in this case. See https://github.com/dask-contrib/dask-sql/issues/87. """ diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 26033390e..98823a9ad 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -1,5 +1,5 @@ +import logging import operator -import warnings import dask.dataframe as dd import numpy as np @@ -8,6 +8,8 @@ from dask.layers import DataFrameIOLayer from dask.utils import M, apply, is_arraylike +logger = logging.getLogger(__name__) + def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: """Use graph information to update IO-level filters @@ -32,6 +34,10 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: try: dsk = RegenerableGraph.from_hlg(ddf.dask) except (ValueError, TypeError): + logger.warning( + "Predicate pushdown optimization skipped. One or more " + "layers in the HighLevelGraph was not 'regenerable'." + ) return ddf # Extract a DNF-formatted filter expression @@ -48,6 +54,10 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: filters = [filters] except ValueError: # DNF dispatching failed for 1+ layers + logger.warning( + "Predicate pushdown optimization skipped. One or more " + "layers has an unknown filter expression." + ) return ddf # We were able to extract a DNF filter expression. @@ -61,9 +71,18 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: or v.creation_info["kwargs"]["filters"] is not None ): # No filters support, or filters is already set + logger.warning( + "Predicate pushdown optimization skipped. The IO " + "layer does not support a `filters` argument, or " + "`filters` was already populated." + ) return ddf if len(io_layer) != 1: # Not a single IO layer + logger.warning( + f"Predicate pushdown optimization skipped. {len(io_layer)} " + f"IO layers detected, but only one IO layer is allowed." + ) return ddf io_layer = io_layer.pop() @@ -76,8 +95,9 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: # Most-likely failed to apply filters in read_parquet. # We can just bail on predicate pushdown, but we also # raise a warning to encourage the user to file an issue. - warnings.warn( - f"Predicate pushdown failed. Please open a bug report at " + logger.warning( + f"Predicate pushdown failed to apply filters: {filters}. " + f"Please open a bug report at " f"https://github.com/dask-contrib/dask-sql/issues/new/choose " f"and include the following error message: {err}" ) From 116d668021f1eeed2216da496e06f6e3b76b9484 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Mar 2022 09:46:04 -0700 Subject: [PATCH 11/23] move bail logic earlier in function --- dask_sql/physical/rel/logical/optimize.py | 67 +++++++++++++---------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 98823a9ad..d923ed4a6 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -25,8 +25,44 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: npartition/divisions-specific logic in following Layers). """ - # Get output layer name and HLG - name = ddf._name + # Check that we have a supported `ddf` object + if not isinstance(ddf, dd.DataFrame): + raise ValueError( + f"Predicate pushdown optimization skipped. Type {type(ddf)} " + f"does not support predicate pushdown." + ) + elif not isinstance(ddf.dask, HighLevelGraph): + logger.warning( + f"Predicate pushdown optimization skipped. Graph must be " + f"a HighLevelGraph object (got {type(ddf.dask)})." + ) + return ddf + + # We were able to extract a DNF filter expression. + # Check that we have a single IO layer with `filters` support + io_layer = [] + for k, v in ddf.dask.layers.items(): + if isinstance(v, DataFrameIOLayer): + io_layer.append(k) + if ( + "filters" not in v.creation_info.get("kwargs", {}) + or v.creation_info["kwargs"]["filters"] is not None + ): + # No filters support, or filters is already set + logger.warning( + "Predicate pushdown optimization skipped. The IO " + "layer does not support a `filters` argument, or " + "`filters` was already populated." + ) + return ddf + if len(io_layer) != 1: + # Not a single IO layer + logger.warning( + f"Predicate pushdown optimization skipped. {len(io_layer)} " + f"IO layers detected, but only one IO layer is allowed." + ) + return ddf + io_layer = io_layer.pop() # Start by converting the HLG to a `RegenerableGraph`. # Succeeding here means that all layers in the graph @@ -41,6 +77,7 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: return ddf # Extract a DNF-formatted filter expression + name = ddf._name try: filters = dsk.layers[name]._dnf_filter_expression(dsk) if filters: @@ -60,32 +97,6 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: ) return ddf - # We were able to extract a DNF filter expression. - # Check that we have a single IO layer with `filters` support - io_layer = [] - for k, v in dsk.layers.items(): - if isinstance(v.layer, DataFrameIOLayer): - io_layer.append(k) - if ( - "filters" not in v.creation_info.get("kwargs", {}) - or v.creation_info["kwargs"]["filters"] is not None - ): - # No filters support, or filters is already set - logger.warning( - "Predicate pushdown optimization skipped. The IO " - "layer does not support a `filters` argument, or " - "`filters` was already populated." - ) - return ddf - if len(io_layer) != 1: - # Not a single IO layer - logger.warning( - f"Predicate pushdown optimization skipped. {len(io_layer)} " - f"IO layers detected, but only one IO layer is allowed." - ) - return ddf - io_layer = io_layer.pop() - # Regenerate collection with filtered IO layer try: return dsk.layers[name]._regenerate_collection( From 600a020fe16954d6a27bce09491444a239b4e3e8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Mar 2022 13:19:42 -0700 Subject: [PATCH 12/23] address easier code review comments --- dask_sql/physical/rel/logical/filter.py | 2 +- dask_sql/physical/rel/logical/optimize.py | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index ef932db28..3ada797c6 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -19,7 +19,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Series]): """ - Some (complex) SQL queries calogger = logging.getLogger(__name__)n lead to a strange condition which is always true or false. + Some (complex) SQL queries can lead to a strange condition which is always true or false. We do not need to filter in this case. See https://github.com/dask-contrib/dask-sql/issues/87. """ diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index d923ed4a6..1dfc7f655 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -44,23 +44,15 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: for k, v in ddf.dask.layers.items(): if isinstance(v, DataFrameIOLayer): io_layer.append(k) + creation_info = v.creatio_info if hasattr(v, "creation_info") else {} if ( - "filters" not in v.creation_info.get("kwargs", {}) - or v.creation_info["kwargs"]["filters"] is not None + "filters" not in creation_info.get("kwargs", {}) + or creation_info["kwargs"]["filters"] is not None ): # No filters support, or filters is already set - logger.warning( - "Predicate pushdown optimization skipped. The IO " - "layer does not support a `filters` argument, or " - "`filters` was already populated." - ) return ddf if len(io_layer) != 1: # Not a single IO layer - logger.warning( - f"Predicate pushdown optimization skipped. {len(io_layer)} " - f"IO layers detected, but only one IO layer is allowed." - ) return ddf io_layer = io_layer.pop() From 359cab008f280ab377dddd819610b5a7fbcf2ecc Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Mar 2022 13:26:26 -0700 Subject: [PATCH 13/23] typo fix --- dask_sql/physical/rel/logical/optimize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 1dfc7f655..9fdc74276 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -44,7 +44,7 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: for k, v in ddf.dask.layers.items(): if isinstance(v, DataFrameIOLayer): io_layer.append(k) - creation_info = v.creatio_info if hasattr(v, "creation_info") else {} + creation_info = v.creation_info if hasattr(v, "creation_info") else {} if ( "filters" not in creation_info.get("kwargs", {}) or creation_info["kwargs"]["filters"] is not None From 6abf658428ca473a7b38fc49b0e9524c9171280d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Mar 2022 18:14:24 -0700 Subject: [PATCH 14/23] fix creation_info access bug --- dask_sql/physical/rel/logical/optimize.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 9fdc74276..99d00d9a0 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -44,7 +44,9 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: for k, v in ddf.dask.layers.items(): if isinstance(v, DataFrameIOLayer): io_layer.append(k) - creation_info = v.creation_info if hasattr(v, "creation_info") else {} + creation_info = ( + (v.creation_info or {}) if hasattr(v, "creation_info") else {} + ) if ( "filters" not in creation_info.get("kwargs", {}) or creation_info["kwargs"]["filters"] is not None From 94294f5be6c65a0d7ff21571998f00591247a7c2 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 18 Mar 2022 06:54:30 -0700 Subject: [PATCH 15/23] convert any expression to DNF --- dask_sql/physical/rel/logical/optimize.py | 79 +++++++++++++++-------- tests/integration/test_filter.py | 7 ++ 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/rel/logical/optimize.py index 99d00d9a0..67e4026f5 100644 --- a/dask_sql/physical/rel/logical/optimize.py +++ b/dask_sql/physical/rel/logical/optimize.py @@ -1,3 +1,4 @@ +import itertools import logging import operator @@ -74,15 +75,10 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: name = ddf._name try: filters = dsk.layers[name]._dnf_filter_expression(dsk) - if filters: - if isinstance(filters[0], (list, tuple)): - filters = list(filters) - else: - filters = [filters] - else: + if not isinstance(filters, frozenset): + # No filters encountered return ddf - if not isinstance(filters, list): - filters = [filters] + filters = filters.to_list_tuple() except ValueError: # DNF dispatching failed for 1+ layers logger.warning( @@ -110,6 +106,51 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame: return ddf +class Or(frozenset): + """Helper class for 'OR' expressions""" + + def to_list_tuple(self): + # NDF "or" is List[List[Tuple]] + def _maybe_list(val): + if isinstance(val, tuple) and val and isinstance(val[0], (tuple, list)): + return list(val) + return [val] + + return [ + _maybe_list(val.to_list_tuple()) + if hasattr(val, "to_list_tuple") + else _maybe_list(val) + for val in self + ] + + +class And(frozenset): + """Helper class for 'AND' expressions""" + + def to_list_tuple(self): + # NDF "and" is List[Tuple] + return tuple( + val.to_list_tuple() if hasattr(val, "to_list_tuple") else val + for val in self + ) + + +def to_dnf(expr): + """Normalize a boolean filter expression to disjunctive normal form (DNF)""" + + # Credit: https://stackoverflow.com/a/58372345 + if not isinstance(expr, (Or, And)): + result = Or((And((expr,)),)) + elif isinstance(expr, Or): + result = Or(se for e in expr for se in to_dnf(e)) + elif isinstance(expr, And): + total = [] + for c in itertools.product(*[to_dnf(e) for e in expr]): + total.append(And(se for e in c for se in e)) + result = Or(total) + return result + + # Define all supported comparison functions # (and their mapping to a string expression) _comparison_symbols = { @@ -301,33 +342,17 @@ def _inv(symbol: str): return (right, _inv(_comparison_symbols[op]), left) if is_arraylike(right) and hasattr(right, "item") and right.size == 1: right = right.item() - return (left, _comparison_symbols[op], right) + return to_dnf((left, _comparison_symbols[op], right)) def _blockwise_logical_dnf(op, indices: list, dsk: RegenerableGraph): # Return DNF expression pattern for logical "and" or "or" left = _get_blockwise_input(0, indices, dsk) right = _get_blockwise_input(1, indices, dsk) - - def _maybe_list(val): - if isinstance(val, tuple) and val and isinstance(val[0], (tuple, list)): - return list(val) - return [val] - - def _maybe_tuple(val): - if isinstance(val, tuple) and val and isinstance(val[0], tuple): - return val - return (val,) - if op == operator.or_: - # NDF "or" is List[List[Tuple]] - return [_maybe_list(left), _maybe_list(right)] + return to_dnf(Or([left, right])) elif op == operator.and_: - # NDF "and" is List[Tuple] - # However, we don't want to add the outer list - # until the filter is finished, or this expression - # is combined with another in an "or" expression - return _maybe_tuple(left) + _maybe_tuple(right) + return to_dnf(And([left, right])) else: raise ValueError diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index e3bc2ea60..2cba34283 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -140,6 +140,13 @@ def test_filter_year(c): "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], ), + ( + # Original filters NOT in disjunctive normal form + "SELECT a FROM parquet_ddf WHERE (parquet_ddf.b > 3 AND parquet_ddf.b < 10 OR parquet_ddf.a = 1) AND (parquet_ddf.c = 'A')", + lambda x: x[ + ((x["b"] > 3) & (x["b"] < 10) | (x["a"] == 1)) & (x["c"] == "A") + ][["a"]], + ), pytest.param( "SELECT * FROM parquet_ddf WHERE year(d) < 2015", lambda x: x[x["d"].dt.year < 2015], From f663e0b2ff53932ca1948b87fb59b475b2513878 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 18 Mar 2022 09:43:00 -0700 Subject: [PATCH 16/23] csv test coverage --- tests/integration/fixtures.py | 4 ++-- tests/integration/test_filter.py | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 766ac87ab..383065b74 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -130,10 +130,10 @@ def parquet_ddf(tmpdir): "index": range(15), }, ) - dd.from_pandas(df, npartitions=3).to_parquet(tmpdir) + dd.from_pandas(df, npartitions=3).to_parquet(os.path.join(tmpdir, "parquet")) # Read back with dask and apply WHERE query - return dd.read_parquet(tmpdir, index="index") + return dd.read_parquet(os.path.join(tmpdir, "parquet"), index="index") @pytest.fixture() diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 2cba34283..4504d7465 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -170,3 +170,27 @@ def test_predicate_pushdown(c, parquet_ddf, query, df_func): df = parquet_ddf.compute() expected_df = df_func(df) dd.assert_eq(return_df, expected_df) + + +def test_filtered_csv(tmpdir, c): + # Predicate pushdown is NOT supported for CSV data. + # This test just checks that the "attempted" + # predicate-pushdown logic does not lead to + # any unexpected errors + + # Write simple csv dataset + df = pd.DataFrame({"a": [1, 2, 3] * 5, "b": range(15), "c": ["A"] * 15,},) + dd.from_pandas(df, npartitions=3).to_csv(tmpdir + "/*.csv", index=False) + + # Read back with dask and apply WHERE query + csv_ddf = dd.read_csv(tmpdir + "/*.csv") + try: + c.create_table("my_csv_table", csv_ddf) + return_df = c.sql("SELECT * FROM my_csv_table WHERE b < 10") + finally: + c.drop_table("my_csv_table") + + # Check computed result is correct + df = csv_ddf.compute() + expected_df = df[df["b"] < 10] + dd.assert_eq(return_df, expected_df) From a18a1490505f598cb7e91d28815706a82c40ec51 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 18 Mar 2022 09:50:27 -0700 Subject: [PATCH 17/23] include IN coverage --- tests/integration/test_filter.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 4504d7465..d1493b50b 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -136,6 +136,10 @@ def test_filter_year(c): "SELECT * FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)], ), + ( + "SELECT * FROM parquet_ddf WHERE b IN (1, 6)", + lambda x: x[(x["b"] == 1) | (x["b"] == 6)], + ), ( "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], From a3725fbb13465296c6a80f1c837ffca2c28a8953 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 18 Mar 2022 11:21:01 -0700 Subject: [PATCH 18/23] improve test rigor --- tests/integration/test_filter.py | 38 +++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index d1493b50b..a291c113d 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -125,24 +125,32 @@ def test_filter_year(c): @pytest.mark.parametrize( - "query,df_func", + "query,df_func,filters", [ - ("SELECT * FROM parquet_ddf WHERE b < 10", lambda x: x[x["b"] < 10]), + ( + "SELECT * FROM parquet_ddf WHERE b < 10", + lambda x: x[x["b"] < 10], + [[("b", "<", 10)]], + ), ( "SELECT * FROM parquet_ddf WHERE a < 3 AND (b > 1 AND b < 5)", lambda x: x[(x["a"] < 3) & ((x["b"] > 1) & (x["b"] < 5))], + [[("a", "<", 3), ("b", ">", 1), ("b", "<", 5)]], ), ( "SELECT * FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)], + [[("a", "==", 1)], [("b", "<", 10), ("b", ">", 5)]], ), ( "SELECT * FROM parquet_ddf WHERE b IN (1, 6)", lambda x: x[(x["b"] == 1) | (x["b"] == 6)], + [[("b", "<=", 1), ("b", ">=", 1)], [("b", "<=", 6), ("b", ">=", 6)]], ), ( "SELECT a FROM parquet_ddf WHERE (b > 5 AND b < 10) OR a = 1", lambda x: x[((x["b"] > 5) & (x["b"] < 10)) | (x["a"] == 1)][["a"]], + [[("a", "==", 1)], [("b", "<", 10), ("b", ">", 5)]], ), ( # Original filters NOT in disjunctive normal form @@ -150,25 +158,35 @@ def test_filter_year(c): lambda x: x[ ((x["b"] > 3) & (x["b"] < 10) | (x["a"] == 1)) & (x["c"] == "A") ][["a"]], + [ + [("c", "==", "A"), ("b", ">", 3), ("b", "<", 10)], + [("a", "==", 1), ("c", "==", "A")], + ], ), - pytest.param( + ( + # The predicate-pushdown optimization will be skipped here, + # because datetime accessors are not supported. However, + # the query should still succeed. "SELECT * FROM parquet_ddf WHERE year(d) < 2015", lambda x: x[x["d"].dt.year < 2015], - # This test will fail, because the filters will not get pushed - # down to read_parquet. However, the query should still succeed. - marks=pytest.mark.xfail( - reason="Predicate pushdown does not support datetime accessors." - ), + None, ), ], ) -def test_predicate_pushdown(c, parquet_ddf, query, df_func): +def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters): # Check for predicate pushdown. # We can use the `hlg_layer` utility to make sure the # `filters` field has been populated in `creation_info` return_df = c.sql(query) - assert hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"]["filters"] + expect_filters = filters + got_filters = hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"][ + "filters" + ] + if expect_filters: + got_filters = frozenset(frozenset(v) for v in got_filters) + expect_filters = frozenset(frozenset(v) for v in filters) + assert got_filters == expect_filters # Check computed result is correct df = parquet_ddf.compute() From 38ca9fb6a99b6ab8ec9848af0e5449b6b41f94a9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 22 Mar 2022 09:24:20 -0700 Subject: [PATCH 19/23] address code review --- dask_sql/physical/rel/logical/filter.py | 2 +- .../logical/optimize.py => utils/filter.py} | 0 tests/integration/test_filter.py | 19 ++++++++++--------- 3 files changed, 11 insertions(+), 10 deletions(-) rename dask_sql/physical/{rel/logical/optimize.py => utils/filter.py} (100%) diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 3ada797c6..6e7078efd 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -7,8 +7,8 @@ from dask_sql.datacontainer import DataContainer from dask_sql.physical.rel.base import BaseRelPlugin -from dask_sql.physical.rel.logical.optimize import attempt_predicate_pushdown from dask_sql.physical.rex import RexConverter +from dask_sql.physical.utils.filter import attempt_predicate_pushdown if TYPE_CHECKING: import dask_sql diff --git a/dask_sql/physical/rel/logical/optimize.py b/dask_sql/physical/utils/filter.py similarity index 100% rename from dask_sql/physical/rel/logical/optimize.py rename to dask_sql/physical/utils/filter.py diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index a291c113d..345b9d9e1 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -2,6 +2,7 @@ import pandas as pd import pytest from dask.utils_test import hlg_layer +from pandas.testing import assert_frame_equal from dask_sql._compat import INT_NAN_IMPLEMENTED @@ -11,7 +12,7 @@ def test_filter(c, df): return_df = return_df.compute() expected_df = df[df["a"] < 2] - dd.assert_eq(return_df, expected_df) + assert_frame_equal(return_df, expected_df) def test_filter_scalar(c, df): @@ -19,25 +20,25 @@ def test_filter_scalar(c, df): return_df = return_df.compute() expected_df = df - dd.assert_eq(return_df, expected_df) + assert_frame_equal(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE False") return_df = return_df.compute() expected_df = df.head(0) - dd.assert_eq(return_df, expected_df, check_index_type=False) + assert_frame_equal(return_df, expected_df, check_index_type=False) return_df = c.sql("SELECT * FROM df WHERE (1 = 1)") return_df = return_df.compute() expected_df = df - dd.assert_eq(return_df, expected_df) + assert_frame_equal(return_df, expected_df) return_df = c.sql("SELECT * FROM df WHERE (1 = 0)") return_df = return_df.compute() expected_df = df.head(0) - dd.assert_eq(return_df, expected_df, check_index_type=False) + assert_frame_equal(return_df, expected_df, check_index_type=False) def test_filter_complicated(c, df): @@ -45,7 +46,7 @@ def test_filter_complicated(c, df): return_df = return_df.compute() expected_df = df[((df["a"] < 3) & ((df["b"] > 1) & (df["b"] < 3)))] - dd.assert_eq( + assert_frame_equal( return_df, expected_df, ) @@ -58,7 +59,7 @@ def test_filter_with_nan(c): expected_df = pd.DataFrame({"c": [3]}, dtype="Int8") else: expected_df = pd.DataFrame({"c": [3]}, dtype="float") - dd.assert_eq( + assert_frame_equal( return_df, expected_df, ) @@ -67,7 +68,7 @@ def test_string_filter(c, string_table): return_df = c.sql("SELECT * FROM string_table WHERE a = 'a normal string'") return_df = return_df.compute() - dd.assert_eq( + assert_frame_equal( return_df, string_table.head(1), ) @@ -121,7 +122,7 @@ def test_filter_year(c): actual_df = c.sql("select * from datetime_test where year(dt) < 2016").compute() expected_df = df[df["year"] < 2016] - dd.assert_eq(expected_df, actual_df) + assert_frame_equal(expected_df, actual_df) @pytest.mark.parametrize( From 275609cfe347b3b95e13a21b60e089f4fda13391 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 24 Mar 2022 20:29:46 -0700 Subject: [PATCH 20/23] skip parquet tests when deps are not installed --- tests/integration/fixtures.py | 38 ++++++++++++++++++++++++-------- tests/integration/test_filter.py | 5 ++--- tests/integration/test_show.py | 2 -- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 2b504fec9..de327ca84 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -20,6 +20,19 @@ LocalCUDACluster = None +try: + import pyarrow as pq_dep +except ImportError: + try: + import fastparquet as pq_dep + except ImportError: + pq_dep = False + +PARQUET_MARK = pytest.mark.skipif( + bool(pq_dep), reason="Parquet dependencies not installed." +) + + @pytest.fixture() def timeseries_df(c): pdf = timeseries(freq="1d").compute().reset_index(drop=True) @@ -111,8 +124,8 @@ def datetime_table(): ) -@pytest.fixture() -def parquet_ddf(tmpdir): +@pytest.fixture(params=[pytest.param(None, marks=PARQUET_MARK)]) +def parquet_ddf(tmpdir, request): # Write simple parquet dataset df = pd.DataFrame( @@ -172,7 +185,6 @@ def c( user_table_nan, string_table, datetime_table, - parquet_ddf, gpu_user_table_1, gpu_df, gpu_long_table, @@ -190,7 +202,6 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, - "parquet_ddf": parquet_ddf, "gpu_user_table_1": gpu_user_table_1, "gpu_df": gpu_df, "gpu_long_table": gpu_long_table, @@ -205,16 +216,25 @@ def c( for df_name, df in dfs.items(): if df is None: continue - if hasattr(df, "npartitions"): - # df is already a dask collection - dask_df = df - else: - dask_df = dd.from_pandas(df, npartitions=3) + dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) yield c +@pytest.fixture() +def c_parquet(parquet_ddf): + # Use separate fixture for parquet tables + # in case the required dependencies are + # not available. + from dask_sql.context import Context + + c = Context() + c.create_table("parquet_ddf", parquet_ddf) + + yield c + + @pytest.fixture() def temporary_data_file(): temporary_data_file = os.path.join( diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 345b9d9e1..27cdaf7b9 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -174,12 +174,11 @@ def test_filter_year(c): ), ], ) -def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters): - +def test_predicate_pushdown(c_parquet, parquet_ddf, query, df_func, filters): # Check for predicate pushdown. # We can use the `hlg_layer` utility to make sure the # `filters` field has been populated in `creation_info` - return_df = c.sql(query) + return_df = c_parquet.sql(query) expect_filters = filters got_filters = hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"][ "filters" diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index 41e315a95..a04129489 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -43,7 +43,6 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", - "parquet_ddf", ] if cudf is None else [ @@ -57,7 +56,6 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", - "parquet_ddf", "gpu_user_table_1", "gpu_df", "gpu_long_table", From 3d2f6d383ce03558df97f7c1382b16b535fde5e6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 24 Mar 2022 20:38:21 -0700 Subject: [PATCH 21/23] fix bug --- tests/integration/fixtures.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 3b0be80ee..548dc6e95 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -29,10 +29,10 @@ try: import fastparquet as pq_dep except ImportError: - pq_dep = False + pq_dep = None PARQUET_MARK = pytest.mark.skipif( - bool(pq_dep), reason="Parquet dependencies not installed." + pq_dep is None, reason="Parquet dependencies not installed." ) From f7187914deed49d0c418fe02d6e4610cc3b8c8cf Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 25 Mar 2022 06:20:59 -0700 Subject: [PATCH 22/23] add pyarrow dep to cluster workers --- .github/docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/docker-compose.yaml b/.github/docker-compose.yaml index cfb7eb43f..92149fa44 100644 --- a/.github/docker-compose.yaml +++ b/.github/docker-compose.yaml @@ -11,5 +11,7 @@ services: container_name: dask-worker image: daskdev/dask:latest command: dask-worker dask-scheduler:8786 + environment: + EXTRA_CONDA_PACKAGES: "pyarrow>1.0.0" volumes: - /tmp:/tmp From 0c69a40d80635a40024e155e955c47de865513c6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 25 Mar 2022 07:20:53 -0700 Subject: [PATCH 23/23] roll back test skipping changes --- .github/docker-compose.yaml | 2 +- tests/integration/fixtures.py | 38 ++++++++------------------------ tests/integration/test_filter.py | 5 +++-- tests/integration/test_show.py | 2 ++ 4 files changed, 15 insertions(+), 32 deletions(-) diff --git a/.github/docker-compose.yaml b/.github/docker-compose.yaml index 92149fa44..56ec50b47 100644 --- a/.github/docker-compose.yaml +++ b/.github/docker-compose.yaml @@ -12,6 +12,6 @@ services: image: daskdev/dask:latest command: dask-worker dask-scheduler:8786 environment: - EXTRA_CONDA_PACKAGES: "pyarrow>1.0.0" + EXTRA_CONDA_PACKAGES: "pyarrow>1.0.0" # required for parquet IO volumes: - /tmp:/tmp diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index 548dc6e95..75b98a9f7 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -23,19 +23,6 @@ SCHEDULER_ADDR = os.getenv("DASK_SQL_TEST_SCHEDULER", None) -try: - import pyarrow as pq_dep -except ImportError: - try: - import fastparquet as pq_dep - except ImportError: - pq_dep = None - -PARQUET_MARK = pytest.mark.skipif( - pq_dep is None, reason="Parquet dependencies not installed." -) - - @pytest.fixture() def timeseries_df(c): pdf = timeseries(freq="1d").compute().reset_index(drop=True) @@ -127,8 +114,8 @@ def datetime_table(): ) -@pytest.fixture(params=[pytest.param(None, marks=PARQUET_MARK)]) -def parquet_ddf(tmpdir, request): +@pytest.fixture() +def parquet_ddf(tmpdir): # Write simple parquet dataset df = pd.DataFrame( @@ -188,6 +175,7 @@ def c( user_table_nan, string_table, datetime_table, + parquet_ddf, gpu_user_table_1, gpu_df, gpu_long_table, @@ -205,6 +193,7 @@ def c( "user_table_nan": user_table_nan, "string_table": string_table, "datetime_table": datetime_table, + "parquet_ddf": parquet_ddf, "gpu_user_table_1": gpu_user_table_1, "gpu_df": gpu_df, "gpu_long_table": gpu_long_table, @@ -219,25 +208,16 @@ def c( for df_name, df in dfs.items(): if df is None: continue - dask_df = dd.from_pandas(df, npartitions=3) + if hasattr(df, "npartitions"): + # df is already a dask collection + dask_df = df + else: + dask_df = dd.from_pandas(df, npartitions=3) c.create_table(df_name, dask_df) yield c -@pytest.fixture() -def c_parquet(parquet_ddf): - # Use separate fixture for parquet tables - # in case the required dependencies are - # not available. - from dask_sql.context import Context - - c = Context() - c.create_table("parquet_ddf", parquet_ddf) - - yield c - - @pytest.fixture() def temporary_data_file(): temporary_data_file = os.path.join( diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index 27cdaf7b9..345b9d9e1 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -174,11 +174,12 @@ def test_filter_year(c): ), ], ) -def test_predicate_pushdown(c_parquet, parquet_ddf, query, df_func, filters): +def test_predicate_pushdown(c, parquet_ddf, query, df_func, filters): + # Check for predicate pushdown. # We can use the `hlg_layer` utility to make sure the # `filters` field has been populated in `creation_info` - return_df = c_parquet.sql(query) + return_df = c.sql(query) expect_filters = filters got_filters = hlg_layer(return_df.dask, "read-parquet").creation_info["kwargs"][ "filters" diff --git a/tests/integration/test_show.py b/tests/integration/test_show.py index a04129489..41e315a95 100644 --- a/tests/integration/test_show.py +++ b/tests/integration/test_show.py @@ -43,6 +43,7 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", + "parquet_ddf", ] if cudf is None else [ @@ -56,6 +57,7 @@ def test_tables(c): "user_table_nan", "string_table", "datetime_table", + "parquet_ddf", "gpu_user_table_1", "gpu_df", "gpu_long_table",