diff --git a/Cargo.toml b/Cargo.toml index aa1ba1f214d5..6a6928e25bdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org" license = "Apache-2.0" readme = "README.md" repository = "https://github.com/apache/datafusion" -rust-version = "1.75" +rust-version = "1.76" version = "39.0.0" [workspace.dependencies] diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 8f4b3cd81f36..8578476ed43d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -26,7 +26,7 @@ license = "Apache-2.0" homepage = "https://datafusion.apache.org" repository = "https://github.com/apache/datafusion" # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.75" +rust-version = "1.76" readme = "README.md" [dependencies] diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 7eecdec8abef..c972536c4d23 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -36,7 +36,7 @@ use crate::error::{Result, _internal_err}; // Combines two hashes into one hash #[inline] -fn combine_hashes(l: u64, r: u64) -> u64 { +pub fn combine_hashes(l: u64, r: u64) -> u64 { let hash = (17 * 37u64).wrapping_add(l); hash.wrapping_mul(37).wrapping_add(r) } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 45617d88dc0c..532ca8fde9e7 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -30,7 +30,7 @@ authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with # "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'" # https://github.com/foresterre/cargo-msrv/issues/590 -rust-version = "1.75" +rust-version = "1.76" [lints] workspace = true diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 409ccf7b47c7..b1debf24e722 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -19,7 +19,8 @@ use std::collections::HashSet; use std::fmt::{self, Display, Formatter, Write}; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; +use std::mem; use std::str::FromStr; use std::sync::Arc; @@ -1461,6 +1462,176 @@ impl Expr { | Expr::Placeholder(..) => false, } } + + /// Hashes the direct content of an `Expr` without recursing into its children. + /// + /// This method is useful to incrementally compute hashes, such as in + /// `CommonSubexprEliminate` which builds a deep hash of a node and its descendants + /// during the bottom-up phase of the first traversal and so avoid computing the hash + /// of the node and then the hash of its descendants separately. + /// + /// If a node doesn't have any children then this method is similar to `.hash()`, but + /// not necessarily returns the same value. + /// + /// As it is pretty easy to forget changing this method when `Expr` changes the + /// implementation doesn't use wildcard patterns (`..`, `_`) to catch changes + /// compile time. + pub fn hash_node(&self, hasher: &mut H) { + mem::discriminant(self).hash(hasher); + match self { + Expr::Alias(Alias { + expr: _expr, + relation, + name, + }) => { + relation.hash(hasher); + name.hash(hasher); + } + Expr::Column(column) => { + column.hash(hasher); + } + Expr::ScalarVariable(data_type, name) => { + data_type.hash(hasher); + name.hash(hasher); + } + Expr::Literal(scalar_value) => { + scalar_value.hash(hasher); + } + Expr::BinaryExpr(BinaryExpr { + left: _left, + op, + right: _right, + }) => { + op.hash(hasher); + } + Expr::Like(Like { + negated, + expr: _expr, + pattern: _pattern, + escape_char, + case_insensitive, + }) + | Expr::SimilarTo(Like { + negated, + expr: _expr, + pattern: _pattern, + escape_char, + case_insensitive, + }) => { + negated.hash(hasher); + escape_char.hash(hasher); + case_insensitive.hash(hasher); + } + Expr::Not(_expr) + | Expr::IsNotNull(_expr) + | Expr::IsNull(_expr) + | Expr::IsTrue(_expr) + | Expr::IsFalse(_expr) + | Expr::IsUnknown(_expr) + | Expr::IsNotTrue(_expr) + | Expr::IsNotFalse(_expr) + | Expr::IsNotUnknown(_expr) + | Expr::Negative(_expr) => {} + Expr::Between(Between { + expr: _expr, + negated, + low: _low, + high: _high, + }) => { + negated.hash(hasher); + } + Expr::Case(Case { + expr: _expr, + when_then_expr: _when_then_expr, + else_expr: _else_expr, + }) => {} + Expr::Cast(Cast { + expr: _expr, + data_type, + }) + | Expr::TryCast(TryCast { + expr: _expr, + data_type, + }) => { + data_type.hash(hasher); + } + Expr::Sort(Sort { + expr: _expr, + asc, + nulls_first, + }) => { + asc.hash(hasher); + nulls_first.hash(hasher); + } + Expr::ScalarFunction(ScalarFunction { func, args: _args }) => { + func.hash(hasher); + } + Expr::AggregateFunction(AggregateFunction { + func_def, + args: _args, + distinct, + filter: _filter, + order_by: _order_by, + null_treatment, + }) => { + func_def.hash(hasher); + distinct.hash(hasher); + null_treatment.hash(hasher); + } + Expr::WindowFunction(WindowFunction { + fun, + args: _args, + partition_by: _partition_by, + order_by: _order_by, + window_frame, + null_treatment, + }) => { + fun.hash(hasher); + window_frame.hash(hasher); + null_treatment.hash(hasher); + } + Expr::InList(InList { + expr: _expr, + list: _list, + negated, + }) => { + negated.hash(hasher); + } + Expr::Exists(Exists { subquery, negated }) => { + subquery.hash(hasher); + negated.hash(hasher); + } + Expr::InSubquery(InSubquery { + expr: _expr, + subquery, + negated, + }) => { + subquery.hash(hasher); + negated.hash(hasher); + } + Expr::ScalarSubquery(subquery) => { + subquery.hash(hasher); + } + Expr::Wildcard { qualifier } => { + qualifier.hash(hasher); + } + Expr::GroupingSet(grouping_set) => { + mem::discriminant(grouping_set).hash(hasher); + match grouping_set { + GroupingSet::Rollup(_exprs) | GroupingSet::Cube(_exprs) => {} + GroupingSet::GroupingSets(_exprs) => {} + } + } + Expr::Placeholder(place_holder) => { + place_holder.hash(hasher); + } + Expr::OuterReferenceColumn(data_type, column) => { + data_type.hash(hasher); + column.hash(hasher); + } + Expr::Unnest(Unnest { expr: _expr }) => {} + }; + } } // modifies expr if it is a placeholder with datatype of right diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 66c23fdced63..e760845e043a 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -18,6 +18,7 @@ //! [`CommonSubexprEliminate`] to avoid redundant computation of common sub-expressions use std::collections::{BTreeSet, HashMap}; +use std::hash::{BuildHasher, Hash, Hasher, RandomState}; use std::sync::Arc; use crate::{OptimizerConfig, OptimizerRule}; @@ -25,11 +26,12 @@ use crate::{OptimizerConfig, OptimizerRule}; use crate::optimizer::ApplyOrder; use crate::utils::NamePreserver; use datafusion_common::alias::AliasGenerator; +use datafusion_common::hash_utils::combine_hashes; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; use datafusion_common::{ - internal_datafusion_err, qualified_name, Column, DFSchema, Result, + internal_datafusion_err, qualified_name, Column, DFSchema, DFSchemaRef, Result, }; use datafusion_expr::expr::Alias; use datafusion_expr::logical_plan::tree_node::unwrap_arc; @@ -43,18 +45,37 @@ const CSE_PREFIX: &str = "__common_expr"; /// Identifier that represents a subexpression tree. /// -/// Note that the current implementation contains: -/// - the `Display` of an expression (a `String`) and -/// - the identifiers of the childrens of the expression -/// concatenated. -/// -/// An identifier should (ideally) be able to "hash", "accumulate", "equal" and "have no -/// collision (as low as possible)" -/// -/// Since an identifier is likely to be copied many times, it is better that an identifier -/// is small or "copy". otherwise some kinds of reference count is needed. String -/// description here is not such a good choose. -type Identifier = String; +/// This identifier is designed to be efficient and "hash", "accumulate", "equal" and +/// "have no collision (as low as possible)" +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Identifier<'n> { + // Hash of `expr` built up incrementally during the first, visiting traversal, but its + // value is not necessarily equal to `expr.hash()`. + hash: u64, + expr: &'n Expr, +} + +impl<'n> Identifier<'n> { + fn new(expr: &'n Expr, random_state: &RandomState) -> Self { + let mut hasher = random_state.build_hasher(); + expr.hash_node(&mut hasher); + let hash = hasher.finish(); + Self { hash, expr } + } + + fn combine(mut self, other: Option) -> Self { + other.map_or(self, |other_id| { + self.hash = combine_hashes(self.hash, other_id.hash); + self + }) + } +} + +impl Hash for Identifier<'_> { + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} /// A cache that contains the postorder index and the identifier of expression tree nodes /// by the preorder index of the nodes. @@ -83,14 +104,14 @@ type Identifier = String; /// (0, "b") /// ] /// ``` -type IdArray = Vec<(usize, Identifier)>; +type IdArray<'n> = Vec<(usize, Option>)>; /// A map that contains the number of occurrences of expressions by their identifiers. -type ExprStats = HashMap; +type ExprStats<'n> = HashMap, usize>; /// A map that contains the common expressions and their alias extracted during the /// second, rewriting traversal. -type CommonExprs = IndexMap; +type CommonExprs<'n> = IndexMap, (Expr, String)>; /// Performs Common Sub-expression Elimination optimization. /// @@ -118,21 +139,86 @@ type CommonExprs = IndexMap; /// ProjectionExec(exprs=[extract (day from new_col), extract (year from new_col)]) <-- reuse here /// ProjectionExec(exprs=[to_date(c1) as new_col]) <-- compute to_date once /// ``` -pub struct CommonSubexprEliminate {} +pub struct CommonSubexprEliminate { + random_state: RandomState, +} impl CommonSubexprEliminate { + pub fn new() -> Self { + Self { + random_state: RandomState::new(), + } + } + + /// Returns the identifier list for each element in `exprs` and a flag to indicate if + /// rewrite phase of CSE make sense. + /// + /// Returns and array with 1 element for each input expr in `exprs` + /// + /// Each element is itself the result of [`CommonSubexprEliminate::expr_to_identifier`] for that expr + /// (e.g. the identifiers for each node in the tree) + fn to_arrays<'n>( + &self, + exprs: &'n [Expr], + expr_stats: &mut ExprStats<'n>, + expr_mask: ExprMask, + ) -> Result<(bool, Vec>)> { + let mut found_common = false; + exprs + .iter() + .map(|e| { + let mut id_array = vec![]; + self.expr_to_identifier(e, expr_stats, &mut id_array, expr_mask) + .map(|fc| { + found_common |= fc; + + id_array + }) + }) + .collect::>>() + .map(|id_arrays| (found_common, id_arrays)) + } + + /// Add an identifier to `id_array` for every subexpression in this tree. + fn expr_to_identifier<'n>( + &self, + expr: &'n Expr, + expr_stats: &mut ExprStats<'n>, + id_array: &mut IdArray<'n>, + expr_mask: ExprMask, + ) -> Result { + // Don't consider volatile expressions for CSE. + Ok(if expr.is_volatile()? { + false + } else { + let mut visitor = ExprIdentifierVisitor { + expr_stats, + id_array, + visit_stack: vec![], + down_index: 0, + up_index: 0, + expr_mask, + random_state: &self.random_state, + found_common: false, + }; + expr.visit(&mut visitor)?; + + visitor.found_common + }) + } + /// Rewrites `exprs_list` with common sub-expressions replaced with a new /// column. /// /// `common_exprs` is updated with any sub expressions that were replaced. /// /// Returns the rewritten expressions - fn rewrite_exprs_list( + fn rewrite_exprs_list<'n>( &self, exprs_list: Vec>, - arrays_list: &[&[IdArray]], - expr_stats: &ExprStats, - common_exprs: &mut CommonExprs, + arrays_list: Vec>>, + expr_stats: &ExprStats<'n>, + common_exprs: &mut CommonExprs<'n>, alias_generator: &AliasGenerator, ) -> Result>>> { let mut transformed = false; @@ -175,7 +261,7 @@ impl CommonSubexprEliminate { fn rewrite_expr( &self, exprs_list: Vec>, - arrays_list: &[&[IdArray]], + arrays_list: Vec>, input: LogicalPlan, expr_stats: &ExprStats, config: &dyn OptimizerConfig, @@ -275,68 +361,95 @@ impl CommonSubexprEliminate { config: &dyn OptimizerConfig, ) -> Result> { // collect all window expressions from any number of LogicalPlanWindow - let ConsecutiveWindowExprs { - window_exprs, - arrays_per_window, - expr_stats, - plan, - } = ConsecutiveWindowExprs::try_new(window)?; + let (mut window_exprs, mut window_schemas, mut plan) = + get_consecutive_window_exprs(window); - let arrays_per_window = arrays_per_window + let mut found_common = false; + let mut expr_stats = ExprStats::new(); + let arrays_per_window = window_exprs .iter() - .map(|arrays| arrays.as_slice()) - .collect::>(); + .map(|window_expr| { + self.to_arrays(window_expr, &mut expr_stats, ExprMask::Normal) + .map(|(fc, id_arrays)| { + found_common |= fc; - // save the original names - let name_preserver = NamePreserver::new(&plan); - let mut saved_names = window_exprs - .iter() - .map(|exprs| { - exprs - .iter() - .map(|expr| name_preserver.save(expr)) - .collect::>>() + id_arrays + }) }) .collect::>>()?; - assert_eq!(window_exprs.len(), arrays_per_window.len()); - let num_window_exprs = window_exprs.len(); - let rewritten_window_exprs = self.rewrite_expr( - window_exprs, - &arrays_per_window, - plan, - &expr_stats, - config, - )?; - let transformed = rewritten_window_exprs.transformed; + if found_common { + // save the original names + let name_preserver = NamePreserver::new(&plan); + let mut saved_names = window_exprs + .iter() + .map(|exprs| { + exprs + .iter() + .map(|expr| name_preserver.save(expr)) + .collect::>>() + }) + .collect::>>()?; - let (mut new_expr, new_input) = rewritten_window_exprs.data; + assert_eq!(window_exprs.len(), arrays_per_window.len()); + let num_window_exprs = window_exprs.len(); + let rewritten_window_exprs = self.rewrite_expr( + // Must clone as Identifiers use references to original expressions so we + // have to keep the original expressions intact. + window_exprs.clone(), + arrays_per_window, + plan, + &expr_stats, + config, + )?; + let transformed = rewritten_window_exprs.transformed; + assert!(transformed); - let mut plan = new_input; + let (mut new_expr, new_input) = rewritten_window_exprs.data; - // Construct consecutive window operator, with their corresponding new - // window expressions. - // - // Note this iterates over, `new_expr` and `saved_names` which are the - // same length, in reverse order - assert_eq!(num_window_exprs, new_expr.len()); - assert_eq!(num_window_exprs, saved_names.len()); - while let (Some(new_window_expr), Some(saved_names)) = - (new_expr.pop(), saved_names.pop()) - { - assert_eq!(new_window_expr.len(), saved_names.len()); + let mut plan = new_input; - // Rename re-written window expressions with original name, to - // preserve the output schema - let new_window_expr = new_window_expr - .into_iter() - .zip(saved_names.into_iter()) - .map(|(new_window_expr, saved_name)| saved_name.restore(new_window_expr)) - .collect::>>()?; - plan = LogicalPlan::Window(Window::try_new(new_window_expr, Arc::new(plan))?); - } + // Construct consecutive window operator, with their corresponding new + // window expressions. + // + // Note this iterates over, `new_expr` and `saved_names` which are the + // same length, in reverse order + assert_eq!(num_window_exprs, new_expr.len()); + assert_eq!(num_window_exprs, saved_names.len()); + while let (Some(new_window_expr), Some(saved_names)) = + (new_expr.pop(), saved_names.pop()) + { + assert_eq!(new_window_expr.len(), saved_names.len()); + + // Rename re-written window expressions with original name, to + // preserve the output schema + let new_window_expr = new_window_expr + .into_iter() + .zip(saved_names.into_iter()) + .map(|(new_window_expr, saved_name)| { + saved_name.restore(new_window_expr) + }) + .collect::>>()?; + plan = LogicalPlan::Window(Window::try_new( + new_window_expr, + Arc::new(plan), + )?); + } - Ok(Transformed::new_transformed(plan, transformed)) + Ok(Transformed::new_transformed(plan, transformed)) + } else { + while let (Some(window_expr), Some(schema)) = + (window_exprs.pop(), window_schemas.pop()) + { + plan = LogicalPlan::Window(Window { + input: Arc::new(plan), + window_expr, + schema, + }); + } + + Ok(Transformed::no(plan)) + } } fn try_optimize_aggregate( @@ -351,56 +464,112 @@ impl CommonSubexprEliminate { schema: orig_schema, .. } = aggregate; - let mut expr_stats = ExprStats::new(); - // track transformed information let mut transformed = false; - // rewrite inputs - let group_arrays = to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; - let aggr_arrays = to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; - let name_perserver = NamePreserver::new_for_projection(); let saved_names = aggr_expr .iter() .map(|expr| name_perserver.save(expr)) .collect::>>()?; - // rewrite both group exprs and aggr_expr - let rewritten = self.rewrite_expr( - vec![group_expr, aggr_expr], - &[&group_arrays, &aggr_arrays], - unwrap_arc(input), - &expr_stats, - config, - )?; - transformed |= rewritten.transformed; - let (mut new_expr, new_input) = rewritten.data; - - // note the reversed pop order. - let new_aggr_expr = pop_expr(&mut new_expr)?; - let new_group_expr = pop_expr(&mut new_expr)?; + let mut expr_stats = ExprStats::new(); + // rewrite inputs + let (group_found_common, group_arrays) = + self.to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; + let (aggr_found_common, aggr_arrays) = + self.to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; + let (new_aggr_expr, new_group_expr, new_input) = + if group_found_common || aggr_found_common { + // rewrite both group exprs and aggr_expr + let rewritten = self.rewrite_expr( + // Must clone as Identifiers use references to original expressions so + // we have to keep the original expressions intact. + vec![group_expr.clone(), aggr_expr.clone()], + vec![group_arrays, aggr_arrays], + unwrap_arc(input), + &expr_stats, + config, + )?; + assert!(rewritten.transformed); + transformed |= rewritten.transformed; + let (mut new_expr, new_input) = rewritten.data; + + // note the reversed pop order. + let new_aggr_expr = pop_expr(&mut new_expr)?; + let new_group_expr = pop_expr(&mut new_expr)?; + + (new_aggr_expr, new_group_expr, Arc::new(new_input)) + } else { + (aggr_expr, group_expr, input) + }; // create potential projection on top let mut expr_stats = ExprStats::new(); - let new_input_schema = Arc::clone(new_input.schema()); - let aggr_arrays = to_arrays( + let (aggr_found_common, aggr_arrays) = self.to_arrays( &new_aggr_expr, &mut expr_stats, ExprMask::NormalAndAggregates, )?; - let mut common_exprs = IndexMap::new(); - let mut rewritten_exprs = self.rewrite_exprs_list( - vec![new_aggr_expr.clone()], - &[&aggr_arrays], - &expr_stats, - &mut common_exprs, - &config.alias_generator(), - )?; - transformed |= rewritten_exprs.transformed; - let rewritten = pop_expr(&mut rewritten_exprs.data)?; + if aggr_found_common { + let mut common_exprs = CommonExprs::new(); + let mut rewritten_exprs = self.rewrite_exprs_list( + // Must clone as Identifiers use references to original expressions so we + // have to keep the original expressions intact. + vec![new_aggr_expr.clone()], + vec![aggr_arrays], + &expr_stats, + &mut common_exprs, + &config.alias_generator(), + )?; + assert!(rewritten_exprs.transformed); + let rewritten = pop_expr(&mut rewritten_exprs.data)?; + + assert!(!common_exprs.is_empty()); + let mut agg_exprs = common_exprs + .into_values() + .map(|(expr, expr_alias)| expr.alias(expr_alias)) + .collect::>(); + + let new_input_schema = Arc::clone(new_input.schema()); + let mut proj_exprs = vec![]; + for expr in &new_group_expr { + extract_expressions(expr, &new_input_schema, &mut proj_exprs)? + } + for (expr_rewritten, expr_orig) in rewritten.into_iter().zip(new_aggr_expr) { + if expr_rewritten == expr_orig { + if let Expr::Alias(Alias { expr, name, .. }) = expr_rewritten { + agg_exprs.push(expr.alias(&name)); + proj_exprs.push(Expr::Column(Column::from_name(name))); + } else { + let expr_alias = config.alias_generator().next(CSE_PREFIX); + let (qualifier, field) = + expr_rewritten.to_field(&new_input_schema)?; + let out_name = qualified_name(qualifier.as_ref(), field.name()); + + agg_exprs.push(expr_rewritten.alias(&expr_alias)); + proj_exprs.push( + Expr::Column(Column::from_name(expr_alias)).alias(out_name), + ); + } + } else { + proj_exprs.push(expr_rewritten); + } + } - if common_exprs.is_empty() { + let agg = LogicalPlan::Aggregate(Aggregate::try_new( + new_input, + new_group_expr, + agg_exprs, + )?); + + Projection::try_new(proj_exprs, Arc::new(agg)) + .map(LogicalPlan::Projection) + .map(Transformed::yes) + } else { + // TODO: How exactly can the name or the schema change in this case? + // In theory `new_aggr_expr` and `new_group_expr` are either the original expressions or they were crafted via `rewrite_expr()`, that keeps the original expression names. + // If this is really needed can we have UT for it? // Alias aggregation expressions if they have changed let new_aggr_expr = new_aggr_expr .into_iter() @@ -409,57 +578,19 @@ impl CommonSubexprEliminate { .collect::>>()?; // Since group_expr may have changed, schema may also. Use try_new method. let new_agg = if transformed { - Aggregate::try_new(Arc::new(new_input), new_group_expr, new_aggr_expr)? + Aggregate::try_new(new_input, new_group_expr, new_aggr_expr)? } else { Aggregate::try_new_with_schema( - Arc::new(new_input), + new_input, new_group_expr, new_aggr_expr, orig_schema, )? }; let new_agg = LogicalPlan::Aggregate(new_agg); - return Ok(Transformed::new_transformed(new_agg, transformed)); - } - let mut agg_exprs = common_exprs - .into_values() - .map(|(expr, expr_alias)| expr.alias(expr_alias)) - .collect::>(); - - let mut proj_exprs = vec![]; - for expr in &new_group_expr { - extract_expressions(expr, &new_input_schema, &mut proj_exprs)? - } - for (expr_rewritten, expr_orig) in rewritten.into_iter().zip(new_aggr_expr) { - if expr_rewritten == expr_orig { - if let Expr::Alias(Alias { expr, name, .. }) = expr_rewritten { - agg_exprs.push(expr.alias(&name)); - proj_exprs.push(Expr::Column(Column::from_name(name))); - } else { - let expr_alias = config.alias_generator().next(CSE_PREFIX); - let (qualifier, field) = - expr_rewritten.to_field(&new_input_schema)?; - let out_name = qualified_name(qualifier.as_ref(), field.name()); - - agg_exprs.push(expr_rewritten.alias(&expr_alias)); - proj_exprs.push( - Expr::Column(Column::from_name(expr_alias)).alias(out_name), - ); - } - } else { - proj_exprs.push(expr_rewritten); - } - } - let agg = LogicalPlan::Aggregate(Aggregate::try_new( - Arc::new(new_input), - new_group_expr, - agg_exprs, - )?); - - Projection::try_new(proj_exprs, Arc::new(agg)) - .map(LogicalPlan::Projection) - .map(Transformed::yes) + Ok(Transformed::new_transformed(new_agg, transformed)) + } } /// Rewrites the expr list and input to remove common subexpressions @@ -483,13 +614,27 @@ impl CommonSubexprEliminate { config: &dyn OptimizerConfig, ) -> Result, LogicalPlan)>> { let mut expr_stats = ExprStats::new(); - let arrays = to_arrays(&expr, &mut expr_stats, ExprMask::Normal)?; - - self.rewrite_expr(vec![expr], &[&arrays], input, &expr_stats, config)? - .map_data(|(mut new_expr, new_input)| { + let (found_common, id_arrays) = + self.to_arrays(&expr, &mut expr_stats, ExprMask::Normal)?; + + if found_common { + let rewritten = self.rewrite_expr( + // Must clone as Identifiers use references to original expressions so we + // have to keep the original expressions intact. + vec![expr.clone()], + vec![id_arrays], + input, + &expr_stats, + config, + )?; + assert!(rewritten.transformed); + rewritten.map_data(|(mut new_expr, new_input)| { assert_eq!(new_expr.len(), 1); Ok((new_expr.pop().unwrap(), new_input)) }) + } else { + Ok(Transformed::no((expr, input))) + } } } @@ -507,7 +652,7 @@ impl CommonSubexprEliminate { /// ``` /// /// Returns: -/// * `window_exprs`: `[a, b, c, d]` +/// * `window_exprs`: `[[a, b, c], [d]]` /// * InputPlan /// /// Consecutive window expressions may refer to same complex expression. @@ -524,41 +669,24 @@ impl CommonSubexprEliminate { /// ``` /// /// where, it is referred once by each `WindowAggr` (total of 2) in the plan. -struct ConsecutiveWindowExprs { - window_exprs: Vec>, - /// result of calling `to_arrays` on each set of window exprs - arrays_per_window: Vec>>, - expr_stats: ExprStats, - /// input plan to the window - plan: LogicalPlan, -} - -impl ConsecutiveWindowExprs { - fn try_new(window: Window) -> Result { - let mut window_exprs = vec![]; - let mut arrays_per_window = vec![]; - let mut expr_stats = ExprStats::new(); - - let mut plan = LogicalPlan::Window(window); - while let LogicalPlan::Window(Window { - input, window_expr, .. - }) = plan - { - plan = unwrap_arc(input); - - let arrays = to_arrays(&window_expr, &mut expr_stats, ExprMask::Normal)?; - - window_exprs.push(window_expr); - arrays_per_window.push(arrays); - } - - Ok(Self { - window_exprs, - arrays_per_window, - expr_stats, - plan, - }) +fn get_consecutive_window_exprs( + window: Window, +) -> (Vec>, Vec, LogicalPlan) { + let mut window_exprs = vec![]; + let mut window_schemas = vec![]; + let mut plan = LogicalPlan::Window(window); + while let LogicalPlan::Window(Window { + input, + window_expr, + schema, + }) = plan + { + window_exprs.push(window_expr); + window_schemas.push(schema); + + plan = unwrap_arc(input); } + (window_exprs, window_schemas, plan) } impl OptimizerRule for CommonSubexprEliminate { @@ -632,41 +760,12 @@ impl Default for CommonSubexprEliminate { } } -impl CommonSubexprEliminate { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - fn pop_expr(new_expr: &mut Vec>) -> Result> { new_expr .pop() .ok_or_else(|| internal_datafusion_err!("Failed to pop expression")) } -/// Returns the identifier list for each element in `exprs` -/// -/// Returns and array with 1 element for each input expr in `exprs` -/// -/// Each element is itself the result of [`expr_to_identifier`] for that expr -/// (e.g. the identifiers for each node in the tree) -fn to_arrays( - exprs: &[Expr], - expr_stats: &mut ExprStats, - expr_mask: ExprMask, -) -> Result> { - exprs - .iter() - .map(|e| { - let mut id_array = vec![]; - expr_to_identifier(e, expr_stats, &mut id_array, expr_mask)?; - - Ok(id_array) - }) - .collect() -} - /// Build the "intermediate" projection plan that evaluates the extracted common /// expressions. /// @@ -790,45 +889,48 @@ impl ExprMask { /// /// `Expr` without sub-expr (column, literal etc.) will not have identifier /// because they should not be recognized as common sub-expr. -struct ExprIdentifierVisitor<'a> { +struct ExprIdentifierVisitor<'a, 'n> { // statistics of expressions - expr_stats: &'a mut ExprStats, + expr_stats: &'a mut ExprStats<'n>, // cache to speed up second traversal - id_array: &'a mut IdArray, + id_array: &'a mut IdArray<'n>, // inner states - visit_stack: Vec, + visit_stack: Vec>, // preorder index, start from 0. down_index: usize, // postorder index, start from 0. up_index: usize, // which expression should be skipped? expr_mask: ExprMask, + // a `RandomState` to generate hashes during the first traversal + random_state: &'a RandomState, + // a flag to indicate that common expression found + found_common: bool, } /// Record item that used when traversing a expression tree. -enum VisitRecord { +enum VisitRecord<'n> { /// `usize` postorder index assigned in `f-down`(). Starts from 0. EnterMark(usize), /// the node's children were skipped => jump to f_up on same node JumpMark, /// Accumulated identifier of sub expression. - ExprItem(Identifier), + ExprItem(Identifier<'n>), } -impl ExprIdentifierVisitor<'_> { +impl<'n> ExprIdentifierVisitor<'_, 'n> { /// Find the first `EnterMark` in the stack, and accumulates every `ExprItem` /// before it. - fn pop_enter_mark(&mut self) -> Option<(usize, Identifier)> { - let mut desc = String::new(); + fn pop_enter_mark(&mut self) -> Option<(usize, Option>)> { + let mut expr_id = None; while let Some(item) = self.visit_stack.pop() { match item { VisitRecord::EnterMark(idx) => { - return Some((idx, desc)); + return Some((idx, expr_id)); } VisitRecord::ExprItem(id) => { - desc.push('|'); - desc.push_str(&id); + expr_id = Some(id.combine(expr_id)); } VisitRecord::JumpMark => return None, } @@ -837,21 +939,22 @@ impl ExprIdentifierVisitor<'_> { } } -impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> { +impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_, 'n> { type Node = Expr; fn f_down(&mut self, expr: &'n Expr) -> Result { - // related to https://github.com/apache/arrow-datafusion/issues/8814 - // If the expr contain volatile expression or is a short-circuit expression, skip it. - // TODO: propagate is_volatile state bottom-up + consider non-volatile sub-expressions for CSE + // TODO: consider non-volatile sub-expressions for CSE // TODO: consider surely executed children of "short circuited"s for CSE - if expr.short_circuits() || expr.is_volatile()? { + + // If an expression can short circuit its children then don't consider it for CSE + // (https://github.com/apache/arrow-datafusion/issues/8814). + if expr.short_circuits() { self.visit_stack.push(VisitRecord::JumpMark); return Ok(TreeNodeRecursion::Jump); } - self.id_array.push((0, "".to_string())); + self.id_array.push((0, None)); self.visit_stack .push(VisitRecord::EnterMark(self.down_index)); self.down_index += 1; @@ -864,13 +967,16 @@ impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> { return Ok(TreeNodeRecursion::Continue); }; - let expr_id = expr_identifier(expr, sub_expr_id); + let expr_id = Identifier::new(expr, self.random_state).combine(sub_expr_id); self.id_array[down_index].0 = self.up_index; if !self.expr_mask.ignores(expr) { - self.id_array[down_index].1.clone_from(&expr_id); - let count = self.expr_stats.entry(expr_id.clone()).or_insert(0); + self.id_array[down_index].1 = Some(expr_id); + let count = self.expr_stats.entry(expr_id).or_insert(0); *count += 1; + if *count > 1 { + self.found_common = true; + } } self.visit_stack.push(VisitRecord::ExprItem(expr_id)); self.up_index += 1; @@ -879,40 +985,17 @@ impl<'n> TreeNodeVisitor<'n> for ExprIdentifierVisitor<'_> { } } -fn expr_identifier(expr: &Expr, sub_expr_identifier: Identifier) -> Identifier { - format!("{{{expr}{sub_expr_identifier}}}") -} - -/// Go through an expression tree and generate identifier for every node in this tree. -fn expr_to_identifier( - expr: &Expr, - expr_stats: &mut ExprStats, - id_array: &mut IdArray, - expr_mask: ExprMask, -) -> Result<()> { - expr.visit(&mut ExprIdentifierVisitor { - expr_stats, - id_array, - visit_stack: vec![], - down_index: 0, - up_index: 0, - expr_mask, - })?; - - Ok(()) -} - /// Rewrite expression by replacing detected common sub-expression with /// the corresponding temporary column name. That column contains the /// evaluate result of replaced expression. -struct CommonSubexprRewriter<'a> { +struct CommonSubexprRewriter<'a, 'n> { // statistics of expressions - expr_stats: &'a ExprStats, + expr_stats: &'a ExprStats<'n>, // cache to speed up second traversal - id_array: &'a IdArray, + id_array: &'a IdArray<'n>, // common expression, that are replaced during the second traversal, are collected to // this map - common_exprs: &'a mut CommonExprs, + common_exprs: &'a mut CommonExprs<'n>, // preorder index, starts from 0. down_index: usize, // how many aliases have we seen so far @@ -921,17 +1004,9 @@ struct CommonSubexprRewriter<'a> { alias_generator: &'a AliasGenerator, } -impl TreeNodeRewriter for CommonSubexprRewriter<'_> { +impl TreeNodeRewriter for CommonSubexprRewriter<'_, '_> { type Node = Expr; - fn f_up(&mut self, expr: Expr) -> Result> { - if matches!(expr, Expr::Alias(_)) { - self.alias_counter -= 1 - } - - Ok(Transformed::no(expr)) - } - fn f_down(&mut self, expr: Expr) -> Result> { if matches!(expr, Expr::Alias(_)) { self.alias_counter += 1; @@ -940,33 +1015,32 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> { // The `CommonSubexprRewriter` relies on `ExprIdentifierVisitor` to generate // the `id_array`, which records the expr's identifier used to rewrite expr. So if we // skip an expr in `ExprIdentifierVisitor`, we should skip it here, too. - if expr.short_circuits() || expr.is_volatile()? { + if expr.short_circuits() { return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)); } - let (up_index, expr_id) = &self.id_array[self.down_index]; + let (up_index, expr_id) = self.id_array[self.down_index]; self.down_index += 1; // skip `Expr`s without identifier (empty identifier). - if expr_id.is_empty() { + let Some(expr_id) = expr_id else { return Ok(Transformed::no(expr)); - } + }; - let count = self.expr_stats.get(expr_id).unwrap(); + let count = self.expr_stats.get(&expr_id).unwrap(); if *count > 1 { // step index to skip all sub-node (which has smaller series number). while self.down_index < self.id_array.len() - && self.id_array[self.down_index].0 < *up_index + && self.id_array[self.down_index].0 < up_index { self.down_index += 1; } let expr_name = expr.display_name()?; - let (_, expr_alias) = - self.common_exprs.entry(expr_id.clone()).or_insert_with(|| { - let expr_alias = self.alias_generator.next(CSE_PREFIX); - (expr, expr_alias) - }); + let (_, expr_alias) = self.common_exprs.entry(expr_id).or_insert_with(|| { + let expr_alias = self.alias_generator.next(CSE_PREFIX); + (expr, expr_alias) + }); // alias the expressions without an `Alias` ancestor node let rewritten = if self.alias_counter > 0 { @@ -981,37 +1055,49 @@ impl TreeNodeRewriter for CommonSubexprRewriter<'_> { Ok(Transformed::no(expr)) } } + + fn f_up(&mut self, expr: Expr) -> Result> { + if matches!(expr, Expr::Alias(_)) { + self.alias_counter -= 1 + } + + Ok(Transformed::no(expr)) + } } /// Replace common sub-expression in `expr` with the corresponding temporary /// column name, updating `common_exprs` with any replaced expressions -fn replace_common_expr( +fn replace_common_expr<'n>( expr: Expr, - id_array: &IdArray, - expr_stats: &ExprStats, - common_exprs: &mut CommonExprs, + id_array: &IdArray<'n>, + expr_stats: &ExprStats<'n>, + common_exprs: &mut CommonExprs<'n>, alias_generator: &AliasGenerator, ) -> Result> { - expr.rewrite(&mut CommonSubexprRewriter { - expr_stats, - id_array, - common_exprs, - down_index: 0, - alias_counter: 0, - alias_generator, - }) + if id_array.is_empty() { + Ok(Transformed::no(expr)) + } else { + expr.rewrite(&mut CommonSubexprRewriter { + expr_stats, + id_array, + common_exprs, + down_index: 0, + alias_counter: 0, + alias_generator, + }) + } } #[cfg(test)] mod test { + use std::collections::HashSet; use std::iter; use arrow::datatypes::{DataType, Field, Schema}; - + use datafusion_expr::expr::AggregateFunction; use datafusion_expr::logical_plan::{table_scan, JoinType}; - use datafusion_expr::{ - grouping_set, AccumulatorFactoryFunction, AggregateUDF, Signature, + grouping_set, AccumulatorFactoryFunction, AggregateUDF, BinaryExpr, Signature, SimpleAggregateUDF, Volatility, }; use datafusion_expr::{lit, logical_plan::builder::LogicalPlanBuilder}; @@ -1028,7 +1114,7 @@ mod test { config: Option<&dyn OptimizerConfig>, ) { assert_eq!(expected, format!("{plan:?}"), "Unexpected starting plan"); - let optimizer = CommonSubexprEliminate {}; + let optimizer = CommonSubexprEliminate::new(); let default_config = OptimizerContext::new(); let config = config.unwrap_or(&default_config); let optimized_plan = optimizer.rewrite(plan, config).unwrap(); @@ -1046,7 +1132,7 @@ mod test { plan: LogicalPlan, config: Option<&dyn OptimizerConfig>, ) { - let optimizer = CommonSubexprEliminate {}; + let optimizer = CommonSubexprEliminate::new(); let default_config = OptimizerContext::new(); let config = config.unwrap_or(&default_config); let optimized_plan = optimizer.rewrite(plan, config).unwrap(); @@ -1058,51 +1144,147 @@ mod test { #[test] fn id_array_visitor() -> Result<()> { - let expr = ((sum(col("a") + lit(1))) - avg(col("c"))) * lit(2); + let optimizer = CommonSubexprEliminate::new(); + + let a_plus_1 = col("a") + lit(1); + let avg_c = avg(col("c")); + let sum_a_plus_1 = sum(a_plus_1); + let sum_a_plus_1_minus_avg_c = sum_a_plus_1 - avg_c; + let expr = sum_a_plus_1_minus_avg_c * lit(2); + + let Expr::BinaryExpr(BinaryExpr { + left: sum_a_plus_1_minus_avg_c, + .. + }) = &expr + else { + panic!("Cannot extract subexpression reference") + }; + let Expr::BinaryExpr(BinaryExpr { + left: sum_a_plus_1, + right: avg_c, + .. + }) = sum_a_plus_1_minus_avg_c.as_ref() + else { + panic!("Cannot extract subexpression reference") + }; + let Expr::AggregateFunction(AggregateFunction { + args: a_plus_1_vec, .. + }) = sum_a_plus_1.as_ref() + else { + panic!("Cannot extract subexpression reference") + }; + let a_plus_1 = &a_plus_1_vec.as_slice()[0]; // skip aggregates let mut id_array = vec![]; - expr_to_identifier(&expr, &mut HashMap::new(), &mut id_array, ExprMask::Normal)?; + optimizer.expr_to_identifier( + &expr, + &mut ExprStats::new(), + &mut id_array, + ExprMask::Normal, + )?; + + // Collect distinct hashes and set them to 0 in `id_array` + fn collect_hashes(id_array: &mut IdArray) -> HashSet { + id_array + .iter_mut() + .flat_map(|(_, expr_id_option)| { + expr_id_option.as_mut().map(|expr_id| { + let hash = expr_id.hash; + expr_id.hash = 0; + hash + }) + }) + .collect::>() + } + + let hashes = collect_hashes(&mut id_array); + assert_eq!(hashes.len(), 3); let expected = vec![ - (8, "{(sum(a + Int32(1)) - avg(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), - (6, "{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), - (3, ""), - (2, "{a + Int32(1)|{Int32(1)}|{a}}"), - (0, ""), - (1, ""), - (5, ""), - (4, ""), - (7, "") - ] - .into_iter() - .map(|(number, id)| (number, id.into())) - .collect::>(); + ( + 8, + Some(Identifier { + hash: 0, + expr: &expr, + }), + ), + ( + 6, + Some(Identifier { + hash: 0, + expr: sum_a_plus_1_minus_avg_c, + }), + ), + (3, None), + ( + 2, + Some(Identifier { + hash: 0, + expr: a_plus_1, + }), + ), + (0, None), + (1, None), + (5, None), + (4, None), + (7, None), + ]; assert_eq!(expected, id_array); // include aggregates let mut id_array = vec![]; - expr_to_identifier( + optimizer.expr_to_identifier( &expr, - &mut HashMap::new(), + &mut ExprStats::new(), &mut id_array, ExprMask::NormalAndAggregates, )?; + let hashes = collect_hashes(&mut id_array); + assert_eq!(hashes.len(), 5); + let expected = vec![ - (8, "{(sum(a + Int32(1)) - avg(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), - (6, "{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), - (3, "{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"), - (2, "{a + Int32(1)|{Int32(1)}|{a}}"), - (0, ""), - (1, ""), - (5, "{avg(c)|{c}}"), - (4, ""), - (7, "") - ] - .into_iter() - .map(|(number, id)| (number, id.into())) - .collect::>(); + ( + 8, + Some(Identifier { + hash: 0, + expr: &expr, + }), + ), + ( + 6, + Some(Identifier { + hash: 0, + expr: sum_a_plus_1_minus_avg_c, + }), + ), + ( + 3, + Some(Identifier { + hash: 0, + expr: sum_a_plus_1, + }), + ), + ( + 2, + Some(Identifier { + hash: 0, + expr: a_plus_1, + }), + ), + (0, None), + (1, None), + ( + 5, + Some(Identifier { + hash: 0, + expr: avg_c, + }), + ), + (4, None), + (7, None), + ]; assert_eq!(expected, id_array); Ok(()) @@ -1367,27 +1549,35 @@ mod test { Ok(()) } + fn test_identifier(hash: u64, expr: &Expr) -> Identifier { + Identifier { hash, expr } + } + #[test] fn redundant_project_fields() { let table_scan = test_table_scan().unwrap(); + let c_plus_a = col("c") + col("a"); + let b_plus_a = col("b") + col("a"); let common_exprs_1 = CommonExprs::from([ ( - "c+a".to_string(), - (col("c") + col("a"), format!("{CSE_PREFIX}_1")), + test_identifier(0, &c_plus_a), + (c_plus_a.clone(), format!("{CSE_PREFIX}_1")), ), ( - "b+a".to_string(), - (col("b") + col("a"), format!("{CSE_PREFIX}_2")), + test_identifier(1, &b_plus_a), + (b_plus_a.clone(), format!("{CSE_PREFIX}_2")), ), ]); + let c_plus_a_2 = col(format!("{CSE_PREFIX}_1")); + let b_plus_a_2 = col(format!("{CSE_PREFIX}_2")); let common_exprs_2 = CommonExprs::from([ ( - "c+a".to_string(), - (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")), + test_identifier(3, &c_plus_a_2), + (c_plus_a_2.clone(), format!("{CSE_PREFIX}_3")), ), ( - "b+a".to_string(), - (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")), + test_identifier(4, &b_plus_a_2), + (b_plus_a_2.clone(), format!("{CSE_PREFIX}_4")), ), ]); let project = build_common_expr_project_plan(table_scan, common_exprs_1).unwrap(); @@ -1408,24 +1598,28 @@ mod test { .unwrap() .build() .unwrap(); + let c_plus_a = col("test1.c") + col("test1.a"); + let b_plus_a = col("test1.b") + col("test1.a"); let common_exprs_1 = CommonExprs::from([ ( - "test1.c+test1.a".to_string(), - (col("test1.c") + col("test1.a"), format!("{CSE_PREFIX}_1")), + test_identifier(0, &c_plus_a), + (c_plus_a.clone(), format!("{CSE_PREFIX}_1")), ), ( - "test1.b+test1.a".to_string(), - (col("test1.b") + col("test1.a"), format!("{CSE_PREFIX}_2")), + test_identifier(1, &b_plus_a), + (b_plus_a.clone(), format!("{CSE_PREFIX}_2")), ), ]); + let c_plus_a_2 = col(format!("{CSE_PREFIX}_1")); + let b_plus_a_2 = col(format!("{CSE_PREFIX}_2")); let common_exprs_2 = CommonExprs::from([ ( - "test1.c+test1.a".to_string(), - (col(format!("{CSE_PREFIX}_1")), format!("{CSE_PREFIX}_3")), + test_identifier(3, &c_plus_a_2), + (c_plus_a_2.clone(), format!("{CSE_PREFIX}_3")), ), ( - "test1.b+test1.a".to_string(), - (col(format!("{CSE_PREFIX}_2")), format!("{CSE_PREFIX}_4")), + test_identifier(4, &b_plus_a_2), + (b_plus_a_2.clone(), format!("{CSE_PREFIX}_4")), ), ]); let project = build_common_expr_project_plan(join, common_exprs_1).unwrap(); @@ -1457,7 +1651,7 @@ mod test { .unwrap() .build() .unwrap(); - let rule = CommonSubexprEliminate {}; + let rule = CommonSubexprEliminate::new(); let optimized_plan = rule.rewrite(plan, &OptimizerContext::new()).unwrap(); assert!(!optimized_plan.transformed); let optimized_plan = optimized_plan.data; diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 66ce7cbd838f..e5d65827cdec 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -26,7 +26,7 @@ homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } authors = { workspace = true } -rust-version = "1.75" +rust-version = "1.76" # Exclude proto files so crates.io consumers don't need protoc exclude = ["*.proto"] diff --git a/datafusion/proto-common/gen/Cargo.toml b/datafusion/proto-common/gen/Cargo.toml index 9f8f03de6dc9..54ec0e44694b 100644 --- a/datafusion/proto-common/gen/Cargo.toml +++ b/datafusion/proto-common/gen/Cargo.toml @@ -20,7 +20,7 @@ name = "gen-common" description = "Code generation for proto" version = "0.1.0" edition = { workspace = true } -rust-version = "1.75" +rust-version = "1.76" authors = { workspace = true } homepage = { workspace = true } repository = { workspace = true } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index aa8d0e55b68f..95d9e6700a50 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -27,7 +27,7 @@ repository = { workspace = true } license = { workspace = true } authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.75" +rust-version = "1.76" # Exclude proto files so crates.io consumers don't need protoc exclude = ["*.proto"] diff --git a/datafusion/proto/gen/Cargo.toml b/datafusion/proto/gen/Cargo.toml index eabaf7ba8e14..401c51c94563 100644 --- a/datafusion/proto/gen/Cargo.toml +++ b/datafusion/proto/gen/Cargo.toml @@ -20,7 +20,7 @@ name = "gen" description = "Code generation for proto" version = "0.1.0" edition = { workspace = true } -rust-version = "1.75" +rust-version = "1.76" authors = { workspace = true } homepage = { workspace = true } repository = { workspace = true } diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index d934dba4cfea..f3f8f6e3abca 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -26,7 +26,7 @@ repository = { workspace = true } license = { workspace = true } authors = { workspace = true } # Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.75" +rust-version = "1.76" [lints] workspace = true