Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dask_sql/physical/utils/filter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import itertools
import logging
import operator
from typing import List

import dask.dataframe as dd
import numpy as np
Expand All @@ -12,7 +13,11 @@
logger = logging.getLogger(__name__)


def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame:
def attempt_predicate_pushdown(
ddf: dd.DataFrame,
conjunctive_filters: List[tuple] = None,
disjunctive_filters: List[tuple] = None,
) -> dd.DataFrame:
"""Use graph information to update IO-level filters

The original `ddf` will be returned if/when the
Expand All @@ -24,6 +29,9 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame:
is due to the fact that `npartitions` and `divisions`
may change when this optimization is applied (invalidating
npartition/divisions-specific logic in following Layers).

Additonally applies provided conjunctive and disjunctive filters
if applicable.
"""

# Check that we have a supported `ddf` object
Expand Down Expand Up @@ -87,6 +95,11 @@ def attempt_predicate_pushdown(ddf: dd.DataFrame) -> dd.DataFrame:
)
return ddf

# Expand the filter set with provided conjunctive and disjunctive filters
filters.extend([f] for f in disjunctive_filters or [])
# Add conjunctive filters to each disjunctive filter
for f in filters:
f.extend(conjunctive_filters or [])
Comment on lines +98 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, and I think I (mostly) like the approach. However, it does seem possible that you will eventually want to do things that are currently prohibited. Some examples:

  1. You want to make a disjunctive addition to your filters, and the new filter contains a conjunction that should not be added to any of the other (existing) filters. For example, you want to apply the existing filters, but you also want to allow null values for a specific user-id. (does this already work?)
  2. You want to add a conjunctive filter to a subset of the pre-existing disjunctive filters.
  3. You want to drop pre-existing filters.

Not sure if any of these cases are important or meaningful at all. Either way, I'm pretty sure you could support (1) and (3) later if needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. You're right that all three cases aren't really support with the current addition and it only helps for passing additional filters that get applied to everything.
Ideally I'd like to consolidate the conjunctive and disjunctive args into a single arg in dnf form which allows some flexibility with the additional filters but doesn't allow good interactivity with existing ones.

While we're unable to return a full dnf for a dataframe rather than having some existing filters coming via the hlg and some passed in additionally, I can't think of a nice way for both to interact cleanly without some good tracking around how we represent that information.

# Regenerate collection with filtered IO layer
try:
return dsk.layers[name]._regenerate_collection(
Expand Down
95 changes: 95 additions & 0 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pandas as pd
import pytest
from dask import dataframe as dd
from dask.utils_test import hlg_layer

from dask_sql.physical.utils.filter import attempt_predicate_pushdown
from dask_sql.utils import Pluggable, is_frame


Expand Down Expand Up @@ -52,3 +54,96 @@ def test_overwrite():

assert PluginTest1.get_plugin("some_key") == "value_2"
assert PluginTest1().get_plugin("some_key") == "value_2"


def test_predicate_pushdown(parquet_ddf):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems unnecessarily intimidating to me :)

Perhaps this should be broken into several different tests?

filtered_df = parquet_ddf[parquet_ddf["a"] > 1]
pushdown_df = attempt_predicate_pushdown(filtered_df)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [[("a", ">", 1)]]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters

filtered_df = parquet_ddf[
(parquet_ddf["a"] > 1) & (parquet_ddf["b"] < 2) | (parquet_ddf["a"] == -1)
]

pushdown_df = attempt_predicate_pushdown(filtered_df)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [[("a", ">", 1), ("b", "<", 2)], [("a", "==", -1)]]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters

disjunctive_filters = [("c", "in", ("A", "B", "C"))]
pushdown_df = attempt_predicate_pushdown(
filtered_df, disjunctive_filters=disjunctive_filters
)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [
[("b", "<", 2), ("a", ">", 1)],
[("a", "==", -1)],
[("c", "in", ("A", "B", "C"))],
]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters

disjunctive_filters = [("c", "in", ("A", "B", "C")), ("b", "in", (5, 6, 7))]
pushdown_df = attempt_predicate_pushdown(
filtered_df, disjunctive_filters=disjunctive_filters
)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [
[("b", "<", 2), ("a", ">", 1)],
[("a", "==", -1)],
[("c", "in", ("A", "B", "C"))],
[("b", "in", (5, 6, 7))],
]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters

conjunctive_filters = [("c", "in", ("A", "B", "C"))]
pushdown_df = attempt_predicate_pushdown(
filtered_df, conjunctive_filters=conjunctive_filters
)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [
[("b", "<", 2), ("a", ">", 1), ("c", "in", ("A", "B", "C"))],
[("a", "==", -1), ("c", "in", ("A", "B", "C"))],
]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters

conjunctive_filters = [("c", "in", ("A", "B", "C")), ("a", "<=", 100)]
disjunctive_filters = [("b", "in", (5, 6, 7)), ("a", ">=", 100)]
pushdown_df = attempt_predicate_pushdown(
filtered_df,
conjunctive_filters=conjunctive_filters,
disjunctive_filters=disjunctive_filters,
)
got_filters = hlg_layer(pushdown_df.dask, "read-parquet").creation_info["kwargs"][
"filters"
]
got_filters = frozenset(frozenset(v) for v in got_filters)
expected_filters = [
[("b", "<", 2), ("a", ">", 1), ("c", "in", ("A", "B", "C")), ("a", "<=", 100)],
[("a", "==", -1), ("c", "in", ("A", "B", "C")), ("a", "<=", 100)],
[("b", "in", (5, 6, 7)), ("c", "in", ("A", "B", "C")), ("a", "<=", 100)],
[("a", ">=", 100), ("c", "in", ("A", "B", "C")), ("a", "<=", 100)],
]
expected_filters = frozenset(frozenset(v) for v in expected_filters)
assert got_filters == expected_filters