Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
/// the same `ExecutionPlan` is reused with different data.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
/// The original children of this PhysicalExpr, if any.
Expand Down Expand Up @@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr {
/// do not change* since those will be used to determine what columns need to read or projected
/// when evaluating the expression.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
/// the same `ExecutionPlan` is reused with different data.
///
/// [`collect_columns`]: crate::utils::collect_columns
pub fn new(
children: Vec<Arc<dyn PhysicalExpr>>,
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Reset any internal state within this [`ExecutionPlan`].
///
/// This method is called when an [`ExecutionPlan`] needs to be re-executed,
/// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
/// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
/// are reset to their initial state.
///
/// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
/// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
/// necessarily resetting any internal state. Implementations that require resetting of some
/// internal state should override this method to provide the necessary logic.
///
/// This method should *not* reset state recursively for children, as it is expected that
/// it will be called from within a walk of the execution plan tree so that it will be called on each child later
/// or was already called on each child.
///
/// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
/// thus it is expected that any cached plan properties will remain valid after the reset.
///
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also add a note to DynamicFilterPhysicalExpr saying any ExecutionPlan that uses them should also implement reset_state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
self.with_new_children(children)
}

/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
/// produce `target_partitions` partitions.
///
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ fn assign_work_table(
}

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
Expand All @@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
} else {
let new_plan = Arc::clone(&plan)
.with_new_children(plan.children().into_iter().cloned().collect())?;
let new_plan = Arc::clone(&plan).reset_state()?;
Ok(Transformed::yes(new_plan))
}
})
Expand Down
91 changes: 69 additions & 22 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,29 @@ impl SortExec {
self
}

/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
}

fn cloned(&self) -> Self {
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch: self.fetch,
cache: self.cache.clone(),
filter: self.filter.clone(),
}
}

/// Modify how many rows to include in the result
///
/// If None, then all rows will be returned, in sorted order.
Expand All @@ -926,25 +949,13 @@ impl SortExec {
}
let filter = fetch.is_some().then(|| {
// If we already have a filter, keep it. Otherwise, create a new one.
self.filter.clone().unwrap_or_else(|| {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
})
self.filter.clone().unwrap_or_else(|| self.create_filter())
});
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch,
cache,
filter,
}
let mut new_sort = self.cloned();
new_sort.fetch = fetch;
new_sort.cache = cache;
new_sort.filter = filter;
new_sort
}

/// Input schema
Expand Down Expand Up @@ -1116,10 +1127,46 @@ impl ExecutionPlan for SortExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
.with_fetch(self.fetch)
.with_preserve_partitioning(self.preserve_partitioning);
new_sort.filter = self.filter.clone();
let mut new_sort = self.cloned();
assert!(
children.len() == 1,
"SortExec should have exactly one child"
);
new_sort.input = Arc::clone(&children[0]);
// Recompute the properties based on the new input since they may have changed.
let (cache, sort_prefix) = Self::compute_properties(
&new_sort.input,
new_sort.expr.clone(),
new_sort.preserve_partitioning,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we put the logic into reset_state if the changes are due to the reset_state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for this change is:

  1. We need to do similar work as with_new_children (i.e. clone SortExec) but each method has slightly different requirements (with_new_children needs to reset cache while reset_state needs to reset filter.
  2. To solve this I created the new SortExec::cloned which does neither of those two things and moved the resetting of cache into with_new_children and the resetting of filter into reset_state.

In other words, it doesn't make sense to put Self::compute_properties(...) in reset_state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function returns a Result though, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point yep I'll do that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed your original point

.expect(concat!(
"Safety: we had already been calling `compute_properties(...).unwrap()` in `new()` ",
"and it seems to be okay",
"\n",
"We assumed that doing the same thing here directly instead ",
"of calling `new()` (as we did before this commit) is also okay but it's possible that ",
"implementations have drifted and this is no longer safe even if `new()` still works, ",
"for example if `new()` now does something different than just calling `compute_properties(...).unwrap()`",
"\n",
"This is clearly a bug, please report it!"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return DatafusionError::Internal here rather than panic'ing as it is a better UX (if you want fail fast in debug builds, perhaps you could add a debug_assert)

I also recommend converting the explanation into comments and leaving the panic message like "Internal inconsistency in SortExec"

The rationale is that if a user sees this message it is not going to mean anything to them and they can't fix it, and this text will obscure the conclusion (this is a bug they can not do anything to fix). A developer will come to the source location and can read the comment.

Copy link
Contributor Author

@adriangb adriangb Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb this already had the possibility to panic in it because it called SortExec::new():

impl SortExec {
/// Create a new sort execution plan that produces a single,
/// sorted output partition.
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
let preserve_partitioning = false;
let (cache, sort_prefix) =
Self::compute_properties(&input, expr.clone(), preserve_partitioning)
.unwrap();

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))

Returning a Result would be a major breaking change to the ExecutionPlan::with_new_children API which I don't think we should do in this PR.

I do think ExecutionPlan::with_new_children returning a Result would be a good thing. In general I think trait methods should err on the side of returning a result in case some implementation needs to. If none of them do I'd expect compilation to make it pretty much a non issue for performance. But maybe let's do that as it's own PR if we really want to.

));
new_sort.cache = cache;
new_sort.common_sort_prefix = sort_prefix;

Ok(Arc::new(new_sort))
}

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
let new_sort = self.with_new_children(children)?;
let mut new_sort = new_sort
.as_any()
.downcast_ref::<SortExec>()
.expect("cloned 1 lines above this line, we know the type")
.clone();
// Our dynamic filter and execution metrics are the state we need to reset.
new_sort.filter = Some(new_sort.create_filter());
new_sort.metrics_set = ExecutionPlanMetricsSet::new();

Ok(Arc::new(new_sort))
}
Expand Down
24 changes: 23 additions & 1 deletion datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,28 @@ physical_plan
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=numbers

# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
query II
with recursive r as (
select 0 as k, 0 as v
union all
(
select *
from r
order by v
limit 1
)
)
select *
from r
limit 5;
----
0 0
0 0
0 0
0 0
0 0

statement count 0
set datafusion.execution.enable_recursive_ctes = false;

Expand All @@ -1004,4 +1026,4 @@ explain WITH RECURSIVE numbers AS (
select 1 as n
UNION ALL
select n + 1 FROM numbers WHERE N < 10
) select * from numbers;
) select * from numbers;
10 changes: 10 additions & 0 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ This version of DataFusion upgrades the underlying Apache Arrow implementation
to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0)
for more details.

### Added `ExecutionPlan::reset_state`

In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`)
produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait.

Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state.
See [#17028] for more details and an example implementation for `SortExec`.

[#17028]: https://github.com/apache/datafusion/pull/17028

## DataFusion `49.0.0`

### `MSRV` updated to 1.85.1
Expand Down