diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 3bc41d2652d9a..38c87b9bc6e41 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -25,7 +25,7 @@ use crate::utils::scatter; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; @@ -327,7 +327,10 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// /// Note for implementers: this method should *not* handle recursion. /// Recursion is handled in [`snapshot_physical_expr`]. - fn snapshot(&self) -> Result>> { + fn snapshot( + &self, + remapped_schema: Option, + ) -> Result>> { // By default, we return None to indicate that this PhysicalExpr does not // have any dynamic references or state. // This is a safe default behavior. @@ -513,9 +516,10 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { /// any dynamic references or state, it returns `None`. pub fn snapshot_physical_expr( expr: Arc, + schema: Option, ) -> Result> { expr.transform_up(|e| { - if let Some(snapshot) = e.snapshot()? { + if let Some(snapshot) = e.snapshot(schema.clone())? { Ok(Transformed::yes(snapshot)) } else { Ok(Transformed::no(Arc::clone(&e))) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index c0a3285f0e781..10d63bd150d73 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -23,27 +23,22 @@ use std::{ }; use crate::PhysicalExpr; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, Result, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash, PhysicalExprRef}; + +use super::Column; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { - /// The original children of this PhysicalExpr, if any. - /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) - /// and later remapped to the actual expressions that are being filtered. - /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly. - children: Vec>, - /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children - /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. - remapped_children: Option>>, /// The source of dynamic filters. - inner: Arc>>, + inner: PhysicalExprRef, + /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -53,20 +48,13 @@ pub struct DynamicFilterPhysicalExpr { impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { - let inner = self.current().expect("Failed to get current expression"); - inner.dyn_hash(state); - self.children.dyn_hash(state); - self.remapped_children.dyn_hash(state); + todo!("") } } impl PartialEq for DynamicFilterPhysicalExpr { fn eq(&self, other: &Self) -> bool { - let inner = self.current().expect("Failed to get current expression"); - let our_children = self.remapped_children.as_ref().unwrap_or(&self.children); - let other_children = other.remapped_children.as_ref().unwrap_or(&other.children); - let other = other.current().expect("Failed to get current expression"); - inner.dyn_eq(other.as_any()) && our_children == other_children + todo!("") } } @@ -105,89 +93,48 @@ impl DynamicFilterPhysicalExpr { /// [`collect_columns`]: crate::utils::collect_columns #[allow(dead_code)] // Only used in tests for now pub fn new( - children: Vec>, + // children: Vec>, inner: Arc, ) -> Self { Self { - children, - remapped_children: None, // Initially no remapped children - inner: Arc::new(RwLock::new(inner)), + inner, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } } - fn remap_children( - children: &[Arc], - remapped_children: Option<&Vec>>, - expr: Arc, - ) -> Result> { - if let Some(remapped_children) = remapped_children { - // Remap the children to the new children - // of the expression. - expr.transform_up(|child| { - // Check if this is any of our original children - if let Some(pos) = - children.iter().position(|c| c.as_ref() == child.as_ref()) - { - // If so, remap it to the current children - // of the expression. - let new_child = Arc::clone(&remapped_children[pos]); - Ok(Transformed::yes(new_child)) - } else { - // Otherwise, just return the expression - Ok(Transformed::no(child)) - } - }) - .data() - } else { - // If we don't have any remapped children, just return the expression - Ok(Arc::clone(&expr)) - } - } + // udpate schema + // pub fn with_schema(&self, schema: SchemaRef) -> Self { + // Self { + // remapped_schema: Some(schema), + // inner: Arc::clone(&self.inner), + // data_type: Arc::clone(&self.data_type), + // nullable: Arc::clone(&self.nullable), + // } + // } - /// Get the current expression. - /// This will return the current expression with any children - /// remapped to match calls to [`PhysicalExpr::with_new_children`]. + // get the source filter pub fn current(&self) -> Result> { - let inner = self - .inner - .read() - .map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire read lock for inner".to_string(), - ) - })? - .clone(); - let inner = - Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?; + let inner = Arc::clone(&self.inner); + + // let inner = self + // .inner + // .read() + // .map_err(|_| { + // datafusion_common::DataFusionError::Execution( + // "Failed to acquire read lock for inner".to_string(), + // ) + // })? + // .clone(); Ok(inner) } - /// Update the current expression. - /// Any children of this expression must be a subset of the original children - /// passed to the constructor. - /// This should be called e.g.: - /// - When we've computed the probe side's hash table in a HashJoinExec - /// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach. - #[allow(dead_code)] // Only used in tests for now - pub fn update(&self, new_expr: Arc) -> Result<()> { - let mut current = self.inner.write().map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire write lock for inner".to_string(), - ) - })?; - // Remap the children of the new expression to match the original children - // We still do this again in `current()` but doing it preventively here - // reduces the work needed in some cases if `current()` is called multiple times - // and the same externally facing `PhysicalExpr` is used for both `with_new_children` and `update()`.` - let new_expr = Self::remap_children( - &self.children, - self.remapped_children.as_ref(), - new_expr, - )?; - *current = new_expr; - Ok(()) + // update source filter + // create a new one + pub fn update(&mut self, filter: PhysicalExprRef) { + self.inner = filter; + // let mut w = self.inner.write().unwrap(); + // *w = filter; } } @@ -197,24 +144,17 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { } fn children(&self) -> Vec<&Arc> { - self.remapped_children - .as_ref() - .unwrap_or(&self.children) - .iter() - .collect() + todo!("") } + // update source filter fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { - Ok(Arc::new(Self { - children: self.children.clone(), - remapped_children: Some(children), - inner: Arc::clone(&self.inner), - data_type: Arc::clone(&self.data_type), - nullable: Arc::clone(&self.nullable), - })) + debug_assert_eq!(children.len(), 1); + let inner = children.swap_remove(0); + Ok(Arc::new(Self::new(inner))) } fn data_type(&self, input_schema: &Schema) -> Result { @@ -287,9 +227,40 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { inner.fmt_sql(f) } - fn snapshot(&self) -> Result>> { - // Return the current expression as a snapshot. - Ok(Some(self.current()?)) + // snapshot with given schema based on the source filter. + // only evalute is expected to be called after this output. no schema or source filter are updated for the snapshot. + fn snapshot( + &self, + remapped_schema: Option, + ) -> Result> { + if let Some(remapped_schema) = remapped_schema { + let pred = self.current()?; + let new_pred = pred + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let index = match remapped_schema.index_of(col.name()) { + Ok(idx) => idx, + Err(_) => { + return Err(datafusion_common::DataFusionError::Plan( + format!("Column {} not found in schema", col.name()), + )) + } + }; + return Ok(Transformed::yes(Arc::new(Column::new( + col.name(), + index, + )))); + } else { + // If the expression is not a column, just return it + return Ok(Transformed::no(expr)); + } + }) + .data()?; + + Ok(Some(new_pred)) + } else { + Ok(Some(self.current()?)) + } } } @@ -304,6 +275,7 @@ mod test { datatypes::{DataType, Field, Schema}, }; use datafusion_common::ScalarValue; + use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use super::*; @@ -319,7 +291,7 @@ mod test { lit(42) as Arc, )); let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( - vec![col("a", &table_schema).unwrap()], + // vec![col("a", &table_schema).unwrap()], expr as Arc, )); // Simulate two `ParquetSource` files with different filter schemas @@ -335,22 +307,16 @@ mod test { ])); // Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr // and remaps the children to the file schema. - let dynamic_filter_1 = reassign_predicate_columns( - Arc::clone(&dynamic_filter) as Arc, - &filter_schema_1, - false, - ) - .unwrap(); - let snap = dynamic_filter_1.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); - let dynamic_filter_2 = reassign_predicate_columns( - Arc::clone(&dynamic_filter) as Arc, - &filter_schema_2, - false, - ) - .unwrap(); - let snap = dynamic_filter_2.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + let snap_1 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_1))) + .unwrap() + .unwrap(); + insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + let snap_2 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_2))) + .unwrap() + .unwrap(); + insta::assert_snapshot!(format!("{snap_2:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); // Both filters allow evaluating the same expression let batch_1 = RecordBatch::try_new( Arc::clone(&filter_schema_1), @@ -373,8 +339,8 @@ mod test { ) .unwrap(); // Evaluate the expression on both batches - let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); - let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + let result_1 = snap_1.evaluate(&batch_1).unwrap(); + let result_2 = snap_2.evaluate(&batch_2).unwrap(); // Check that the results are the same let ColumnarValue::Array(arr_1) = result_1 else { panic!("Expected ColumnarValue::Array"); @@ -393,13 +359,26 @@ mod test { col("a", &table_schema).unwrap(), datafusion_expr::Operator::Gt, lit(43) as Arc, - )); - dynamic_filter - .update(Arc::clone(&new_expr) as Arc) - .expect("Failed to update expression"); + )) as PhysicalExprRef; + + let dynamic_filter = dynamic_filter + .with_new_children(vec![new_expr]) + .expect("Failed to update children"); + // dynamic_filter.update(new_expr); + + let snap_1 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_1))) + .unwrap() + .unwrap(); + let snap_2 = dynamic_filter + .snapshot(Some(Arc::clone(&filter_schema_2))) + .unwrap() + .unwrap(); + // Now we should be able to evaluate the new expression on both batches - let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); - let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + let result_1 = snap_1.evaluate(&batch_1).unwrap(); + let result_2 = snap_2.evaluate(&batch_2).unwrap(); + // Check that the results are the same let ColumnarValue::Array(arr_1) = result_1 else { panic!("Expected ColumnarValue::Array"); @@ -417,24 +396,27 @@ mod test { #[test] fn test_snapshot() { let expr = lit(42) as Arc; - let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + let dynamic_filter = DynamicFilterPhysicalExpr::new(Arc::clone(&expr)); // Take a snapshot of the current expression - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(expr)); + let snapshot = dynamic_filter.snapshot(None).unwrap().unwrap(); + assert_eq!(&snapshot, &expr); // Update the current expression let new_expr = lit(100) as Arc; - dynamic_filter.update(Arc::clone(&new_expr)).unwrap(); + let df = Arc::new(dynamic_filter) as PhysicalExprRef; + let df = df + .with_new_children(vec![new_expr.clone()]) + .expect("Failed to update expression"); + // dynamic_filter.update(Arc::clone(&new_expr)); // Take another snapshot - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(new_expr)); + let snapshot = df.snapshot(None).unwrap().unwrap(); + assert_eq!(&snapshot, &new_expr); } #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { - let dynamic_filter = - DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc); + let mut dynamic_filter = DynamicFilterPhysicalExpr::new(lit(42)); // First call to data_type and nullable should set the initial values. let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); @@ -453,9 +435,7 @@ mod test { ); // Now change the current expression to something else. - dynamic_filter - .update(lit(ScalarValue::Utf8(None)) as Arc) - .expect("Failed to update expression"); + dynamic_filter.update(lit(ScalarValue::Utf8(None))); // Check that we error if we call data_type, nullable or evaluate after changing the expression. assert!( dynamic_filter.data_type(&Schema::empty()).is_err(), @@ -466,9 +446,9 @@ mod test { "Expected err when nullable is called after changing the expression." ); let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); - assert!( - dynamic_filter.evaluate(&batch).is_err(), - "Expected err when evaluate is called after changing the expression." - ); + + let snap = dynamic_filter.snapshot(None).unwrap().unwrap(); + // this is changed to ok, but makes sense + assert!(snap.evaluate(&batch).is_ok(),); } }