Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
}
}

impl Clone for DynamicFilterPhysicalExpr {
fn clone(&self) -> Self {
DynamicFilterPhysicalExpr::new(
self.children().into_iter().cloned().collect(),
self.current().unwrap_or_else(|_| crate::expressions::lit(true)))
}
}

#[cfg(test)]
mod test {
use crate::{
Expand Down
18 changes: 18 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Returns a new `ExecutionPlan` with fresh state, where all existing children
/// were replaced by the `children`, in order.
///
/// This method is intended for cases where you need a plan with fresh state
/// rather than preserving existing state (which `with_new_children` does).
/// For example, recursive queries need fresh state for each iteration to avoid
/// sharing stateful expressions across multiple executions.
///
/// The default implementation simply calls `with_new_children()`, preserving
/// the existing behavior. ExecutionPlan implementations that need to provide
/// fresh state should override this method.
fn with_fresh_state(
self: Arc<Self>,
child: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.with_new_children(vec![child.clone()])
}

/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
/// produce `target_partitions` partitions.
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
Ok(Transformed::no(plan))
} else {
let new_plan = Arc::clone(&plan)
.with_new_children(plan.children().into_iter().cloned().collect())?;
.with_fresh_state(&plan.children()[0])?;
Ok(Transformed::yes(new_plan))
}
})
Expand Down
80 changes: 80 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,22 @@ impl ExecutionPlan for SortExec {
Ok(Arc::new(new_sort))
}

fn with_fresh_state(
self: Arc<Self>,
child: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_sort = SortExec::new(self.expr.clone(), child.clone())
.with_fetch(self.fetch)
.with_preserve_partitioning(self.preserve_partitioning);

// Create fresh filter instances to avoid sharing across multiple executions
// This fixes issue #16998 where SortExec shares DynamicFilterPhysicalExpr
// across multiple query executions, causing recursive queries to fail
new_sort.filter = self.filter.as_ref().map(|f| Arc::new(f.as_ref().clone()));

Ok(Arc::new(new_sort))
}

fn execute(
&self,
partition: usize,
Expand Down Expand Up @@ -2055,4 +2071,68 @@ mod tests {
"#);
Ok(())
}

#[tokio::test]
async fn test_sort_exec_filter_cloning_issue_16998() -> Result<()> {
// Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
// This test ensures that when with_fresh_state is called multiple times (as happens in
// recursive queries), each SortExec instance gets its own copy of the dynamic filter.

async fn collect_stream(mut stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
let mut batches = Vec::new();
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
Ok(batches)
}

let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("v", DataType::Int32, false),
]));

// Create test data
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![0, 1, 2]))],
)?;

let input = TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?;

// Create a SortExec with a fetch limit (which creates a dynamic filter)
let sort_exec = Arc::new(SortExec::new(
[PhysicalSortExpr {
expr: col("v", &schema)?,
options: SortOptions::default(),
}].into(),
input,
).with_fetch(Some(1)));

// Call with_fresh_state multiple times to simulate recursive query scenario
// This should create independent copies of the dynamic filter
let new_child = sort_exec.input().clone();
let sort_exec2 = sort_exec.clone().with_fresh_state(&new_child)?;
let sort_exec3 = sort_exec.clone().with_fresh_state(&new_child)?;

// Execute both to ensure they work independently without shared state issues
let stream1 = sort_exec2.execute(0, Arc::clone(&task_ctx))?;
let stream2 = sort_exec3.execute(0, Arc::clone(&task_ctx))?;

// Collect results from both streams - this would fail before the fix
// because the shared filter would cause state corruption
let result1 = collect_stream(stream1).await?;
let result2 = collect_stream(stream2).await?;

// Both should return exactly 1 row due to the fetch limit
assert_eq!(result1.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
assert_eq!(result2.iter().map(|b| b.num_rows()).sum::<usize>(), 1);

// The values should be the same (smallest value due to sort)
let val1 = result1[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap().value(0);
let val2 = result2[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap().value(0);
assert_eq!(val1, 0);
assert_eq!(val2, 0);

Ok(())
}
}
23 changes: 23 additions & 0 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,29 @@ physical_plan
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=numbers

# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
# This should return 5 rows but currently returns only 2 due to the bug
query II
with recursive r as (
select 0 as k, 0 as v
union all
(
select *
from r
order by v
limit 1
)
)
select *
from r
limit 5;
----
0 0
0 0
0 0
0 0
0 0

statement count 0
set datafusion.execution.enable_recursive_ctes = false;

Expand Down