From 6147f9e06e42bd632f435ec085c565b63279aff8 Mon Sep 17 00:00:00 2001 From: Robert Ream Date: Thu, 31 Jul 2025 14:36:30 -0700 Subject: [PATCH 1/2] Test and fix for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions --- datafusion/physical-plan/src/sorts/sort.rs | 77 +++++++++++++++++++++- datafusion/sqllogictest/test_files/cte.slt | 23 +++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 0b7d3977d2707..9fe45a446b245 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1119,7 +1119,18 @@ impl ExecutionPlan for SortExec { let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) .with_preserve_partitioning(self.preserve_partitioning); - new_sort.filter = self.filter.clone(); + // Fully clone the filter to avoid sharing across multiple executions + // This fixes issue #16998 where SortExec shares DynamicFilterPhysicalExpr + // across multiple query executions, causing recursive queries to fail + new_sort.filter = self.filter.as_ref().map(|f| { + Arc::new(DynamicFilterPhysicalExpr::new( + f.children().into_iter().cloned().collect(), + f.current().unwrap_or_else(|_| { + // Fallback to a true literal if we can't get the current expression + lit(true) + }) + )) + }); Ok(Arc::new(new_sort)) } @@ -2055,4 +2066,68 @@ mod tests { "#); Ok(()) } + + #[tokio::test] + async fn test_sort_exec_filter_cloning_issue_16998() -> Result<()> { + // Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions + // This test ensures that when with_new_children is called multiple times (as happens in + // recursive queries), each SortExec instance gets its own copy of the dynamic filter. + + async fn collect_stream(mut stream: SendableRecordBatchStream) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + Ok(batches) + } + + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![ + Field::new("v", DataType::Int32, false), + ])); + + // Create test data + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0, 1, 2]))], + )?; + + let input = TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + // Create a SortExec with a fetch limit (which creates a dynamic filter) + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }].into(), + input, + ).with_fetch(Some(1))); + + // Call with_new_children multiple times to simulate recursive query scenario + // This should create independent copies of the dynamic filter + let new_children = vec![sort_exec.input().clone()]; + let sort_exec2 = sort_exec.clone().with_new_children(new_children.clone())?; + let sort_exec3 = sort_exec.clone().with_new_children(new_children)?; + + // Execute both to ensure they work independently without shared state issues + let stream1 = sort_exec2.execute(0, Arc::clone(&task_ctx))?; + let stream2 = sort_exec3.execute(0, Arc::clone(&task_ctx))?; + + // Collect results from both streams - this would fail before the fix + // because the shared filter would cause state corruption + let result1 = collect_stream(stream1).await?; + let result2 = collect_stream(stream2).await?; + + // Both should return exactly 1 row due to the fetch limit + assert_eq!(result1.iter().map(|b| b.num_rows()).sum::(), 1); + assert_eq!(result2.iter().map(|b| b.num_rows()).sum::(), 1); + + // The values should be the same (smallest value due to sort) + let val1 = result1[0].column(0).as_any().downcast_ref::().unwrap().value(0); + let val2 = result2[0].column(0).as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val1, 0); + assert_eq!(val2, 0); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4fb0..5c42899b554fa 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,29 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +# This should return 5 rows but currently returns only 2 due to the bug +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + statement count 0 set datafusion.execution.enable_recursive_ctes = false; From 6ddfe698cdcaedaab2b0025fd51825797d7c83ce Mon Sep 17 00:00:00 2001 From: Robert Ream Date: Sat, 2 Aug 2025 06:58:15 -0700 Subject: [PATCH 2/2] PR feedback --- .../src/expressions/dynamic_filters.rs | 8 +++++ .../physical-plan/src/execution_plan.rs | 18 ++++++++++ .../physical-plan/src/recursive_query.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 35 +++++++++++-------- 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index a9a4e23233b7c..47e2b18ff7adc 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -323,6 +323,14 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } } +impl Clone for DynamicFilterPhysicalExpr { + fn clone(&self) -> Self { + DynamicFilterPhysicalExpr::new( + self.children().into_iter().cloned().collect(), + self.current().unwrap_or_else(|_| crate::expressions::lit(true))) + } +} + #[cfg(test)] mod test { use crate::{ diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6b402528c1c8f..db37d8bab884f 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -194,6 +194,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; + /// Returns a new `ExecutionPlan` with fresh state, where all existing children + /// were replaced by the `children`, in order. + /// + /// This method is intended for cases where you need a plan with fresh state + /// rather than preserving existing state (which `with_new_children` does). + /// For example, recursive queries need fresh state for each iteration to avoid + /// sharing stateful expressions across multiple executions. + /// + /// The default implementation simply calls `with_new_children()`, preserving + /// the existing behavior. ExecutionPlan implementations that need to provide + /// fresh state should override this method. + fn with_fresh_state( + self: Arc, + child: &Arc, + ) -> Result> { + self.with_new_children(vec![child.clone()]) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 99b460dfcfdcd..6b2b85702c4ee 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -384,7 +384,7 @@ fn reset_plan_states(plan: Arc) -> Result, + child: &Arc, + ) -> Result> { + let mut new_sort = SortExec::new(self.expr.clone(), child.clone()) + .with_fetch(self.fetch) + .with_preserve_partitioning(self.preserve_partitioning); + + // Create fresh filter instances to avoid sharing across multiple executions // This fixes issue #16998 where SortExec shares DynamicFilterPhysicalExpr // across multiple query executions, causing recursive queries to fail - new_sort.filter = self.filter.as_ref().map(|f| { - Arc::new(DynamicFilterPhysicalExpr::new( - f.children().into_iter().cloned().collect(), - f.current().unwrap_or_else(|_| { - // Fallback to a true literal if we can't get the current expression - lit(true) - }) - )) - }); + new_sort.filter = self.filter.as_ref().map(|f| Arc::new(f.as_ref().clone())); Ok(Arc::new(new_sort)) } @@ -2070,7 +2075,7 @@ mod tests { #[tokio::test] async fn test_sort_exec_filter_cloning_issue_16998() -> Result<()> { // Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions - // This test ensures that when with_new_children is called multiple times (as happens in + // This test ensures that when with_fresh_state is called multiple times (as happens in // recursive queries), each SortExec instance gets its own copy of the dynamic filter. async fn collect_stream(mut stream: SendableRecordBatchStream) -> Result> { @@ -2103,11 +2108,11 @@ mod tests { input, ).with_fetch(Some(1))); - // Call with_new_children multiple times to simulate recursive query scenario + // Call with_fresh_state multiple times to simulate recursive query scenario // This should create independent copies of the dynamic filter - let new_children = vec![sort_exec.input().clone()]; - let sort_exec2 = sort_exec.clone().with_new_children(new_children.clone())?; - let sort_exec3 = sort_exec.clone().with_new_children(new_children)?; + let new_child = sort_exec.input().clone(); + let sort_exec2 = sort_exec.clone().with_fresh_state(&new_child)?; + let sort_exec3 = sort_exec.clone().with_fresh_state(&new_child)?; // Execute both to ensure they work independently without shared state issues let stream1 = sort_exec2.execute(0, Arc::clone(&task_ctx))?;