diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index a9a4e23233b7..47e2b18ff7ad 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -323,6 +323,14 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } } +impl Clone for DynamicFilterPhysicalExpr { + fn clone(&self) -> Self { + DynamicFilterPhysicalExpr::new( + self.children().into_iter().cloned().collect(), + self.current().unwrap_or_else(|_| crate::expressions::lit(true))) + } +} + #[cfg(test)] mod test { use crate::{ diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6b402528c1c8..db37d8bab884 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -194,6 +194,24 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; + /// Returns a new `ExecutionPlan` with fresh state, where all existing children + /// were replaced by the `children`, in order. + /// + /// This method is intended for cases where you need a plan with fresh state + /// rather than preserving existing state (which `with_new_children` does). + /// For example, recursive queries need fresh state for each iteration to avoid + /// sharing stateful expressions across multiple executions. + /// + /// The default implementation simply calls `with_new_children()`, preserving + /// the existing behavior. ExecutionPlan implementations that need to provide + /// fresh state should override this method. + fn with_fresh_state( + self: Arc, + child: &Arc, + ) -> Result> { + self.with_new_children(vec![child.clone()]) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 99b460dfcfdc..6b2b85702c4e 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -384,7 +384,7 @@ fn reset_plan_states(plan: Arc) -> Result, + child: &Arc, + ) -> Result> { + let mut new_sort = SortExec::new(self.expr.clone(), child.clone()) + .with_fetch(self.fetch) + .with_preserve_partitioning(self.preserve_partitioning); + + // Create fresh filter instances to avoid sharing across multiple executions + // This fixes issue #16998 where SortExec shares DynamicFilterPhysicalExpr + // across multiple query executions, causing recursive queries to fail + new_sort.filter = self.filter.as_ref().map(|f| Arc::new(f.as_ref().clone())); + + Ok(Arc::new(new_sort)) + } + fn execute( &self, partition: usize, @@ -2055,4 +2071,68 @@ mod tests { "#); Ok(()) } + + #[tokio::test] + async fn test_sort_exec_filter_cloning_issue_16998() -> Result<()> { + // Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions + // This test ensures that when with_fresh_state is called multiple times (as happens in + // recursive queries), each SortExec instance gets its own copy of the dynamic filter. + + async fn collect_stream(mut stream: SendableRecordBatchStream) -> Result> { + let mut batches = Vec::new(); + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + Ok(batches) + } + + let task_ctx = Arc::new(TaskContext::default()); + let schema = Arc::new(Schema::new(vec![ + Field::new("v", DataType::Int32, false), + ])); + + // Create test data + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0, 1, 2]))], + )?; + + let input = TestMemoryExec::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)?; + + // Create a SortExec with a fetch limit (which creates a dynamic filter) + let sort_exec = Arc::new(SortExec::new( + [PhysicalSortExpr { + expr: col("v", &schema)?, + options: SortOptions::default(), + }].into(), + input, + ).with_fetch(Some(1))); + + // Call with_fresh_state multiple times to simulate recursive query scenario + // This should create independent copies of the dynamic filter + let new_child = sort_exec.input().clone(); + let sort_exec2 = sort_exec.clone().with_fresh_state(&new_child)?; + let sort_exec3 = sort_exec.clone().with_fresh_state(&new_child)?; + + // Execute both to ensure they work independently without shared state issues + let stream1 = sort_exec2.execute(0, Arc::clone(&task_ctx))?; + let stream2 = sort_exec3.execute(0, Arc::clone(&task_ctx))?; + + // Collect results from both streams - this would fail before the fix + // because the shared filter would cause state corruption + let result1 = collect_stream(stream1).await?; + let result2 = collect_stream(stream2).await?; + + // Both should return exactly 1 row due to the fetch limit + assert_eq!(result1.iter().map(|b| b.num_rows()).sum::(), 1); + assert_eq!(result2.iter().map(|b| b.num_rows()).sum::(), 1); + + // The values should be the same (smallest value due to sort) + let val1 = result1[0].column(0).as_any().downcast_ref::().unwrap().value(0); + let val2 = result2[0].column(0).as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val1, 0); + assert_eq!(val2, 0); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4fb..5c42899b554f 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,29 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +# This should return 5 rows but currently returns only 2 due to the bug +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + statement count 0 set datafusion.execution.enable_recursive_ctes = false;