Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7ca1b39
q3 functionality
sarahyurick Mar 28, 2023
04845bc
style and minor functionality changes
sarahyurick Mar 29, 2023
2ac5693
some cleanup
sarahyurick Apr 4, 2023
1fbcc76
Merge branch 'main' into dpp
sarahyurick Apr 11, 2023
8ca673d
save progress
sarahyurick Apr 11, 2023
535302e
Merge branch 'main' into dpp
sarahyurick Apr 13, 2023
848a357
use inlist instead of binaryexpr
sarahyurick Apr 13, 2023
903533d
fix cargo test
sarahyurick Apr 13, 2023
d1c2b0b
fix some queries
sarahyurick Apr 14, 2023
b7ecfb5
use with_max_passes=1 and remove todos
sarahyurick Apr 20, 2023
5af3c3f
add warning
sarahyurick Apr 20, 2023
11e3d55
only run dpp once
sarahyurick Apr 21, 2023
f7ac414
null handling and double dtype
sarahyurick May 2, 2023
b3f82e5
minor style fixes
sarahyurick May 2, 2023
3fd16f6
clippy
sarahyurick May 2, 2023
da73605
Merge branch 'main' into dpp
sarahyurick May 4, 2023
4ad27c3
use adp imports
sarahyurick May 4, 2023
bef8517
add jeremy suggestions and better type logic
sarahyurick May 11, 2023
6d79107
style fix
sarahyurick May 11, 2023
dc9c5ca
MORE int/float logic
sarahyurick May 11, 2023
e3c364d
style fix
sarahyurick May 11, 2023
e65b1e3
Merge branch 'main' into dpp
sarahyurick May 15, 2023
b6ef201
fix some bugs
sarahyurick May 15, 2023
56ef3c4
add dask_config
sarahyurick May 16, 2023
cffc055
check for duplicate tablescans
sarahyurick May 30, 2023
26a7c62
fix row iterator
sarahyurick May 30, 2023
15dadf0
clippy
sarahyurick May 30, 2023
46e6c69
clippy again
sarahyurick May 30, 2023
cfacae0
Merge branch 'main' into dpp
jdye64 May 31, 2023
39872a9
Merge branch 'main' into dpp
sarahyurick Jun 7, 2023
091a7f1
add support for custom filters to attempt_predicate_pushdown
rjzamora Jun 7, 2023
83ff856
remove comments
rjzamora Jun 7, 2023
87a7f5c
fix preserve_filters=False bug
rjzamora Jun 7, 2023
e6c9aa6
Merge remote-tracking branch 'upstream/main' into custom-filter-support
rjzamora Jun 7, 2023
8303e4f
copy changes from 1040
rjzamora Jun 12, 2023
8ca5d17
Merge branch 'main' into add-table-scan-filters
rjzamora Jun 12, 2023
13d81b4
fix kwarg bug
rjzamora Jun 12, 2023
91ca882
Merge branch 'main' into add-table-scan-filters
rjzamora Jun 12, 2023
6250fed
Merge remote-tracking branch 'sarahyurick/dpp' into add-table-scan-fi…
rjzamora Jun 12, 2023
6c921b2
avoid using list within predicate values
rjzamora Jun 12, 2023
a053683
Merge branch 'add-table-scan-filters' of https://github.com/rjzamora/…
rjzamora Jun 12, 2023
07644c0
Merge remote-tracking branch 'upstream/main' into add-table-scan-filters
rjzamora Jun 21, 2023
b0ca1e9
roll back accidental change
rjzamora Jun 21, 2023
94ed18c
Merge branch 'main' into add-table-scan-filters
rjzamora Jun 21, 2023
77286cb
Merge branch 'main' into add-table-scan-filters
ayushdg Jun 22, 2023
242abfb
Merge branch 'main' into add-table-scan-filters
ayushdg Jun 27, 2023
c2c5c73
Re-add all filters after conjunctive_dnf_filters
ayushdg Jun 27, 2023
c13d2c9
Planner: Update table_scan to return the pyexpr in addition to dnf tuple
ayushdg Jun 28, 2023
c9f8fb7
update table_scan logic to use the new filtered_dnf tuple format
ayushdg Jun 28, 2023
7e813f1
improve caching a bit
rjzamora Jun 30, 2023
63e20d8
Merge branch 'main' into add-table-scan-filters
ayushdg Jun 30, 2023
1bbaac1
Merge branch 'main' into add-table-scan-filters
ayushdg Jun 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions dask_planner/src/sql/logical/table_scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, vec};

use datafusion_python::{
datafusion_common::{DFSchema, ScalarValue},
Expand All @@ -19,6 +19,7 @@ pub struct PyTableScan {
input: Arc<LogicalPlan>,
}

type FilterTuple = (String, String, Option<Vec<PyObject>>);
#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)]
#[derive(Debug, Clone)]
pub struct PyFilteredResult {
Expand All @@ -31,7 +32,7 @@ pub struct PyFilteredResult {
// Expr(s) that can have their filtering logic performed in the pyarrow IO logic
// are stored here in a DNF format that is expected by pyarrow.
#[pyo3(get)]
pub filtered_exprs: Vec<(String, String, Vec<PyObject>)>,
pub filtered_exprs: Vec<(PyExpr, FilterTuple)>,
}

impl PyTableScan {
Expand All @@ -45,9 +46,10 @@ impl PyTableScan {
/// it as well if needed.
pub fn _expand_dnf_filter(
filter: &Expr,
input: &Arc<LogicalPlan>,
py: Python,
) -> Result<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError> {
let mut filter_tuple: Vec<(String, String, Vec<PyObject>)> = Vec::new();
) -> Result<Vec<(PyExpr, FilterTuple)>, DaskPlannerError> {
let mut filter_tuple: Vec<(PyExpr, FilterTuple)> = Vec::new();

match filter {
Expr::InList {
Expand Down Expand Up @@ -100,25 +102,48 @@ impl PyTableScan {
.collect();

filter_tuple.push((
ident.unwrap_or(expr.canonical_name()),
op.to_string(),
il?,
PyExpr::from(filter.clone(), Some(vec![input.clone()])),
(
ident.unwrap_or(expr.canonical_name()),
op.to_string(),
Some(il?),
),
));
Ok(filter_tuple)
} else {
let er = DaskPlannerError::InvalidIOFilter(format!(
"Invalid identifying column Expr instance `{}`. using in Dask instead",
filter
));
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
Err::<Vec<(PyExpr, FilterTuple)>, DaskPlannerError>(er)
}
}
Expr::IsNotNull(expr) => {
// Only handle simple Expr(s) for IsNotNull operations for now
let ident = match *expr.clone() {
Expr::Column(col) => Ok(col.name),
_ => Err(DaskPlannerError::InvalidIOFilter(format!(
"Invalid IsNotNull Expr type `{}`. using in Dask instead",
filter
))),
};

filter_tuple.push((
PyExpr::from(filter.clone(), Some(vec![input.clone()])),
(
ident.unwrap_or(expr.canonical_name()),
"is not".to_string(),
None,
),
));
Ok(filter_tuple)
}
_ => {
let er = DaskPlannerError::InvalidIOFilter(format!(
"Unable to apply filter: `{}` to IO reader, using in Dask instead",
filter
));
Err::<Vec<(String, String, Vec<PyObject>)>, DaskPlannerError>(er)
Err::<Vec<(PyExpr, FilterTuple)>, DaskPlannerError>(er)
}
}
}
Expand All @@ -132,12 +157,12 @@ impl PyTableScan {
filters: &[Expr],
py: Python,
) -> PyFilteredResult {
let mut filtered_exprs: Vec<(String, String, Vec<PyObject>)> = Vec::new();
let mut filtered_exprs: Vec<(PyExpr, FilterTuple)> = Vec::new();
let mut unfiltered_exprs: Vec<PyExpr> = Vec::new();

filters
.iter()
.for_each(|f| match PyTableScan::_expand_dnf_filter(f, py) {
.for_each(|f| match PyTableScan::_expand_dnf_filter(f, input, py) {
Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter),
Err(_e) => {
unfiltered_exprs.push(PyExpr::from(f.clone(), Some(vec![input.clone()])))
Expand Down
10 changes: 7 additions & 3 deletions dask_sql/physical/rel/logical/filter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, List, Union

import dask.config as dask_config
import dask.dataframe as dd
Expand All @@ -17,7 +17,11 @@
logger = logging.getLogger(__name__)


def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Series]):
def filter_or_scalar(
df: dd.DataFrame,
filter_condition: Union[np.bool_, dd.Series],
add_filters: List = None,
):
"""
Some (complex) SQL queries can lead to a strange condition which is always true or false.
We do not need to filter in this case.
Expand All @@ -35,7 +39,7 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri
filter_condition = filter_condition.fillna(False)
out = df[filter_condition]
if dask_config.get("sql.predicate_pushdown"):
return attempt_predicate_pushdown(out)
return attempt_predicate_pushdown(out, add_filters=add_filters)
else:
return out

Expand Down
32 changes: 28 additions & 4 deletions dask_sql/physical/rel/logical/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from functools import reduce
from typing import TYPE_CHECKING

from dask.utils_test import hlg_layer

from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rel.logical.filter import filter_or_scalar
Expand Down Expand Up @@ -77,16 +79,38 @@ def _apply_projections(self, table_scan, dask_table, dc):
def _apply_filters(self, table_scan, rel, dc, context):
df = dc.df
cc = dc.column_container
filters = table_scan.getFilters()
# All partial filters here are applied in conjunction (&)
if filters:
all_filters = table_scan.getFilters()
conjunctive_dnf_filters = table_scan.getDNFFilters().filtered_exprs
non_dnf_filters = table_scan.getDNFFilters().io_unfilterable_exprs

if conjunctive_dnf_filters:
# Extract the PyExprs from the conjunctive DNF filters
filter_exprs = [f[0] for f in conjunctive_dnf_filters]
if non_dnf_filters:
filter_exprs.extend(non_dnf_filters)

df_condition = reduce(
operator.and_,
[
RexConverter.convert(rel, rex, dc, context=context)
for rex in filter_exprs
],
)
df = filter_or_scalar(
df, df_condition, add_filters=[f[1] for f in conjunctive_dnf_filters]
)
elif all_filters:
df_condition = reduce(
operator.and_,
[
RexConverter.convert(rel, rex, dc, context=context)
for rex in filters
for rex in all_filters
],
)
df = filter_or_scalar(df, df_condition)
try:
logger.debug(hlg_layer(df.dask, "read-parquet").creation_info)
except KeyError:
pass

return DataContainer(df, cc)
31 changes: 26 additions & 5 deletions dask_sql/physical/utils/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ def attempt_predicate_pushdown(

# Regenerate collection with filtered IO layer
try:
_regen_cache = {}
return dsk.layers[name]._regenerate_collection(
dsk,
# TODO: shouldn't need to specify index=False after dask#9661 is merged
new_kwargs={io_layer: {"filters": filters, "index": False}},
_regen_cache=_regen_cache,
)
except ValueError as err:
# Most-likely failed to apply filters in read_parquet.
Expand Down Expand Up @@ -195,15 +197,33 @@ def __bool__(self) -> bool:
@classmethod
def normalize(cls, filters: _And | _Or | list | tuple | None):
"""Convert raw filters to the `_Or(_And)` DNF representation"""

def _valid_tuple(predicate: tuple):
col, op, val = predicate
if isinstance(col, tuple):
raise TypeError("filters must be List[Tuple] or List[List[Tuple]]")
if op in ("in", "not in"):
return (col, op, tuple(val))
else:
return predicate

def _valid_list(conjunction: list):
valid = []
for predicate in conjunction:
if not isinstance(predicate, tuple):
raise TypeError(f"Predicate must be a tuple, got {predicate}")
valid.append(_valid_tuple(predicate))
return valid

if not filters:
result = None
elif isinstance(filters, list):
conjunctions = filters if isinstance(filters[0], list) else [filters]
result = cls._Or([cls._And(conjunction) for conjunction in conjunctions])
result = cls._Or(
[cls._And(_valid_list(conjunction)) for conjunction in conjunctions]
)
elif isinstance(filters, tuple):
if isinstance(filters[0], tuple):
raise TypeError("filters must be List[Tuple] or List[List[Tuple]]")
result = cls._Or((cls._And((filters,)),))
result = cls._Or((cls._And((_valid_tuple(filters),)),))
elif isinstance(filters, cls._Or):
result = cls._Or(se for e in filters for se in cls.normalize(e))
elif isinstance(filters, cls._And):
Expand Down Expand Up @@ -332,7 +352,8 @@ def _regenerate_collection(

# Return regenerated layer if the work was
# already done
_regen_cache = _regen_cache or {}
if _regen_cache is None:
_regen_cache = {}
if self.layer.output in _regen_cache:
return _regen_cache[self.layer.output]

Expand Down