Skip to content

Conversation

@2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Nov 12, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

Background for dynamic filter: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/

The following queries can be used for quick global insights:

-- Q1
select min(l_shipdate) from lineitem;
-- Q2
select min(l_shipdate) from lineitem where l_returnflag = 'R';

Now Q1 can get executed very efficiently by directly check the file metadata if possible:

> explain select min(l_shipdate) from lineitem;
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │       ProjectionExec      │ |
|               | │    --------------------   │ |
|               | │ min(lineitem.l_shipdate): │ |
|               | │         1992-01-02        │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │     PlaceholderRowExec    │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.007 seconds.

However for Q2 now it's still doing the whole scan, and it's possible to use dynamic filters to speed them up.

Benchmarking Q2

Setup

  1. Generate tpch-sf100 parquet file with tpchgen-cli -s 100 --format=parquet (https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli)
  2. In datafusion-cli, run
CREATE EXTERNAL TABLE lineitem
STORED AS PARQUET
LOCATION '/Users/yongting/data/tpch_sf100/lineitem.parquet';

select min(l_shipdate) from lineitem where l_returnflag = 'R';

Result
Main: 0.55s
PR: 0.09s

Aggregate Dynamic Filter Pushdown Overview

For queries like

  -- `example_table(type TEXT, val INT)`
  SELECT min(val)
  FROM example_table
  WHERE type='A';

And example_table's physical representation is a partitioned parquet file with
column statistics

  • part-0.parquet: val {min=0, max=100}
  • part-1.parquet: val {min=100, max=200}
  • ...
  • part-100.parquet: val {min=10000, max=10100}

After scanning the 1st file, we know we only have to read files if their minimal
value on val column is less than 0, the minimal val value in the 1st file.

We can skip scanning the remaining file by implementing dynamic filter, the
intuition is we keep a shared data structure for current min in both AggregateExec
and DataSourceExec, and let it update during execution, so the scanner can
know during execution if it's possible to skip scanning certain files. See
physical optimizer rule FilterPushdown for details.

Implementation

Enable Condition

  • No grouping (no GROUP BY clause in the sql, only a single global group to aggregate)
  • The aggregate expression must be min/max, and evaluate directly on columns.
    Note multiple aggregate expressions that satisfy this requirement are allowed,
    and a dynamic filter will be constructed combining all applicable expr's
    states. See more in the following example with dynamic filter on multiple columns.

Filter Construction

The filter is kept in the DataSourceExec, and it will gets update during execution,
the reader will interpret it as "the upstream only needs rows that such filter
predicate is evaluated to true", and certain scanner implementation like parquet
can evalaute column statistics on those dynamic filters, to decide if they can
prune a whole range.
Examples

  • Expr: min(a), Dynamic Filter: a < a_cur_min
  • Expr: min(a), max(a), min(b), Dynamic Filter: (a < a_cur_min) OR (a > a_cur_max) OR (b < b_cur_min)

What changes are included in this PR?

The goal is is to let aggregate expressions MIN/MAX with only column reference as argument (e.g. min(col1)) support dynamic filter, the above implementation rationale has explained it further.

The implementation includes:

  1. Added AggrDynFilter struct, and it would be shared across different partition streams to store the current bounds for dynamic filter update.
  2. init_dynamic_filter is responsible checking the conditions for whether to enable dynamic filter in the current aggregate execution plan, and finally build the AggrDynFilter inside the operator.
  3. During aggregation execution, after evaluating each batch, the current bound is refreshed in the dynamic filter, enabling the scanner to skip prunable units using the latest runtime bounds. (now it's updating every batch, perhaps we can let them update every k batches to avoid overheads?)
  4. Updated gather_filters_for_pushdown and handle_child_pushdown_result API in AggregateExec to enable self dynamic filter generation and pushdown.
  5. Added a configuration to turn it on/off

TODO

  • Add tests for grouping set
  • Only update bounds if they're tightened, to reduce lock contention (follow-up perhaps)

Questions

Now the implementations only pushdown aggregates like min(col1), that the inner physical expression is exactly a column reference, I realized this might be too conservative. Should we always pushdown the dynamic filter, and let the PruningPredicate to decide if we can use the expression to skip partitions.

Examples:

  1. min(col1 + 1): we push down col1 + 1 < col1_plus_1_cur_min, and the PruningPredicate can use such expression to prune.
  2. min(pow(col1, 2)): we push down pow(col1, 2) < col1_pow_cur_min, and the PruningPredicate cannot interpret the inner physical expression, so it decides not to prune always

Are these changes tested?

Yes, optimize UTs and end-to-end tests

Are there any user-facing changes?

No

@github-actions github-actions bot added documentation Improvements or additions to documentation core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate physical-plan Changes to the physical-plan crate labels Nov 12, 2025
})?;
// First get current partition's bound, then update the shared bound among
// all partitions.
let current_bound = acc.evaluate()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let current_bound = acc.evaluate()?;
let current_bound = acc.evaluate()?;
if current_bound.is_null() {
continue;
}

?!
because it will affect the scalar_min() below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Additionally, maybe we should only update the shared bound if the local bound has tighten, to reduce lock contention.

@alamb alamb requested a review from adriangb November 12, 2025 14:36
@alamb alamb added the performance Make DataFusion faster label Nov 12, 2025
Comment on lines +510 to +512
/// During filter pushdown optimization, if a child node can accept this filter,
/// it remains `Some(..)` to enable dynamic filtering during aggregate execution;
/// otherwise, it is cleared to `None`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that currently "does the child node accept the filter" is a bit murky: even if it is says No it can still retain a reference e.g. for statistics pruning.

It seems to me we may need to expand the pushdown response from Yes/No to Exact/Inexact/Unsupported.

Or maybe we should check the Arc reference counts 😛? If no one else has a reference... no point in updating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Also, what's the current semantics? Is Yes map to either Exact or InExact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely: Yes can mean Exact or Inexact but doesn’t differentiate between them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@2010YOUY01 how do you think we should handle this? I think we should always compute the filter for now and solve this problem of wasted effort holistically for all uses of dynamic filters later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the issue yet. Do you have an example case for this 'wasted effort', like the child is replying Yes but it's not holding an dynamic filter, or other situations we should better disable the dynamic filter in the AggregateExec side.

One thing I feel also ambiguous is when a child replies, (e.g. for parquet) do we differentiate if they're using the dynamic filter for stat pruning, or evaluate filter row by row (what datafusion.execution.parquet.pushdown_filters controls). I'm only expecting this aggregate dynamic filter to skip pruneable units instead of doing row-level filtering .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I feel also ambiguous is when a child replies, (e.g. for parquet) do we differentiate if they're using the dynamic filter for stat pruning, or evaluate filter row by row

That's the crux of the issue:

// If pushdown_filters is false we tell our parents that they still have to handle the filters,
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
if !pushdown_filters {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
)
.with_updated_node(source));
}

So currently Parquet says No even when it stores a copy of the filters for stats pruning. So for this PR we should change it to always update the filters even if the child responded No.

If Parquet responded Yes instead it would possibly lead to incorrect results if it can't actually evaluate the filter row by row.

For operators like HashJoinExec that may want to push down an expression that cannot be used for stats pruning the "wasted effort" would be if they still do compute to update the filters but then ParquetSource can't use them for stats pruning and doesn't push them down row-by-row.

So in summary:

  1. I think this PR should always push down the filters even if the answer it got back is No because ParquetSource may still use them for stats pruning.
  2. We should make a ticket to improve the system to have 3 states Exact/Inexact/Unsupported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the issue now — the current reply semantics are quite ambiguous.

I’ve opened #18856
. Besides making the reply more precise, I also suggested allowing the pushdown source to specify its intent (for example, push down only for stats pruning, not for row-level filtering), since some predicates are very expensive and we’d like finer control. Let’s move the discussion there if you have additional points!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated to use the ref-count approach to enable/disable the aggregate dynamic filter in 2f58de3

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super cool @2010YOUY01 ! I hadn't even thought of this use case. Truly amazing.

I left a small comment for now. Overall the change looks good but requires more in depth review. I'll try over the next couple days but am on vacation so it may take a week 🙏🏻

@2010YOUY01
Copy link
Contributor Author

This is super cool @2010YOUY01 ! I hadn't even thought of this use case. Truly amazing.

I left a small comment for now. Overall the change looks good but requires more in depth review. I'll try over the next couple days but am on vacation so it may take a week 🙏🏻

Thanks! Enjoy your vacation. 😄

Comment on lines +1319 to +1320
let dyn_filter = self.dynamic_filter.as_ref().unwrap();
let child_accepts_dyn_filter = Arc::strong_count(dyn_filter) > 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what @LiaCastaneda thinks of this? It seems like quite an elegant way to say "is anyone using this filter". If no one has a reference to it it's almost certain no one is using it... We could even make a method like:

struct DynamicFilterPhysicalExpr {
    fn is_used(self: Arc<Self>) -> bool { Arc::strong_count(self) > 1 }
}

?

Copy link
Contributor

@LiaCastaneda LiaCastaneda Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me! is_used would even help solve #17527

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess any sort of distributed plan would need a reference to the filter to listen for updates anyway, so this should be compatible with those use cases.

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good for now! The remaining issues are larger scale improvements that need to be made to filter pushdown / dynamic filters.

@adriangb
Copy link
Contributor

@2010YOUY01 sorry for the delay in reviewing, could you resolve conflicts and we can move to merge this?

@2010YOUY01
Copy link
Contributor Author

@2010YOUY01 sorry for the delay in reviewing, could you resolve conflicts and we can move to merge this?

Thank you for the review. Conflicts resolved.

@alamb Do you want to review again before merging?

@2010YOUY01
Copy link
Contributor Author

Let's merge it now — if anyone has additional suggestions, I'd be happy to update it in follow-ups.

Thanks again for the review! @martin-g and @adriangb

@2010YOUY01 2010YOUY01 added this pull request to the merge queue Dec 1, 2025
Merged via the queue into apache:main with commit 96a5f21 Dec 1, 2025
33 checks passed
@alamb
Copy link
Contributor

alamb commented Dec 1, 2025

@2010YOUY01 sorry for the delay in reviewing, could you resolve conflicts and we can move to merge this?

Thank you for the review. Conflicts resolved.

@alamb Do you want to review again before merging?

BTW please don't hold up merging waiting on me to review. I am no longer able to keep up with all reviews and clearly there is a great group of committers driving things forward.

ShashidharM0118 pushed a commit to ShashidharM0118/datafusion that referenced this pull request Dec 1, 2025
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Background for dynamic filter:
https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/

The following queries can be used for quick global insights:
```
-- Q1
select min(l_shipdate) from lineitem;
-- Q2
select min(l_shipdate) from lineitem where l_returnflag = 'R';
```

Now Q1 can get executed very efficiently by directly check the file
metadata if possible:
```
> explain select min(l_shipdate) from lineitem;
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │       ProjectionExec      │ |
|               | │    --------------------   │ |
|               | │ min(lineitem.l_shipdate): │ |
|               | │         1992-01-02        │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │     PlaceholderRowExec    │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.007 seconds.
```
However for Q2 now it's still doing the whole scan, and it's possible to
use dynamic filters to speed them up.

### Benchmarking Q2
#### Setup
1. Generate tpch-sf100 parquet file with `tpchgen-cli -s 100
--format=parquet`
(https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli)
2. In datafusion-cli, run
```
CREATE EXTERNAL TABLE lineitem
STORED AS PARQUET
LOCATION '/Users/yongting/data/tpch_sf100/lineitem.parquet';

select min(l_shipdate) from lineitem where l_returnflag = 'R';
```
**Result**
Main: 0.55s
PR: 0.09s

### Aggregate Dynamic Filter Pushdown Overview

For queries like
```sql
  -- `example_table(type TEXT, val INT)`
  SELECT min(val)
  FROM example_table
  WHERE type='A';
```

And `example_table`'s physical representation is a partitioned parquet
file with
column statistics
- part-0.parquet: val {min=0, max=100}
- part-1.parquet: val {min=100, max=200}
- ...
- part-100.parquet: val {min=10000, max=10100}

After scanning the 1st file, we know we only have to read files if their
minimal
value on `val` column is less than 0, the minimal `val` value in the 1st
file.

We can skip scanning the remaining file by implementing dynamic filter,
the
intuition is we keep a shared data structure for current min in both
`AggregateExec`
and `DataSourceExec`, and let it update during execution, so the scanner
can
know during execution if it's possible to skip scanning certain files.
See
physical optimizer rule `FilterPushdown` for details.

### Implementation

#### Enable Condition
- No grouping (no `GROUP BY` clause in the sql, only a single global
group to aggregate)
- The aggregate expression must be `min`/`max`, and evaluate directly on
columns.
Note multiple aggregate expressions that satisfy this requirement are
allowed,
and a dynamic filter will be constructed combining all applicable expr's
states. See more in the following example with dynamic filter on
multiple columns.

#### Filter Construction
The filter is kept in the `DataSourceExec`, and it will gets update
during execution,
the reader will interpret it as "the upstream only needs rows that such
filter
predicate is evaluated to true", and certain scanner implementation like
`parquet`
can evalaute column statistics on those dynamic filters, to decide if
they can
prune a whole range.
**Examples**
- Expr: `min(a)`, Dynamic Filter: `a < a_cur_min`
- Expr: `min(a), max(a), min(b)`, Dynamic Filter: `(a < a_cur_min) OR (a
> a_cur_max) OR (b < b_cur_min)`

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
The goal is is to let aggregate expressions `MIN/MAX` with only column
reference as argument (e.g. min(col1)) support dynamic filter, the above
implementation rationale has explained it further.

The implementation includes:
1. Added `AggrDynFilter` struct, and it would be shared across different
partition streams to store the current bounds for dynamic filter update.
2. `init_dynamic_filter` is responsible checking the conditions for
whether to enable dynamic filter in the current aggregate execution
plan, and finally build the `AggrDynFilter` inside the operator.
3. During aggregation execution, after evaluating each batch, the
current bound is refreshed in the dynamic filter, enabling the scanner
to skip prunable units using the latest runtime bounds. (now it's
updating every batch, perhaps we can let them update every k batches to
avoid overheads?)
4. Updated `gather_filters_for_pushdown` and
`handle_child_pushdown_result` API in `AggregateExec` to enable self
dynamic filter generation and pushdown.
5. Added a configuration to turn it on/off

### TODO
- [x] Add tests for grouping set
- [ ] Only update bounds if they're tightened, to reduce lock contention
(follow-up perhaps)

### Questions

Now the implementations only pushdown aggregates like `min(col1)`, that
the inner physical expression is exactly a column reference, I realized
this might be too conservative. Should we always pushdown the dynamic
filter, and let the
[PruningPredicate](https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html)
to decide if we can use the expression to skip partitions.

Examples:
1. min(col1 + 1): we push down `col1 + 1 < col1_plus_1_cur_min`, and the
`PruningPredicate` can use such expression to prune.
2. min(pow(col1, 2)): we push down `pow(col1, 2) < col1_pow_cur_min`,
and the `PruningPredicate` cannot interpret the inner physical
expression, so it decides not to prune always

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Yes, optimize UTs and end-to-end tests

## Are there any user-facing changes?
No
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation performance Make DataFusion faster physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants