Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
/// default implementation will not be called (left as `todo!()`)
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
Ok(Expr::from(WindowFunction {
fun: WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
Expand Down
35 changes: 17 additions & 18 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,17 @@ impl DefaultPhysicalPlanner {
};

let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction(WindowFunction {
ref partition_by,
ref order_by,
..
}) => generate_sort_key(partition_by, order_by),
Expr::WindowFunction(window_function) => generate_sort_key(
window_function.partition_by(),
window_function.order_by(),
),
Expr::Alias(Alias { expr, .. }) => {
// Convert &Box<T> to &T
match &**expr {
Expr::WindowFunction(WindowFunction {
ref partition_by,
ref order_by,
..
}) => generate_sort_key(partition_by, order_by),
Expr::WindowFunction(window_function) => generate_sort_key(
window_function.partition_by(),
window_function.order_by(),
),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -1520,14 +1518,15 @@ pub fn create_window_expr_with_name(
let name = name.into();
let physical_schema: &Schema = &logical_schema.into();
match e {
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
}) => {
Expr::WindowFunction(window_function) => {
let WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
} = window_function.as_ref();
let physical_args =
create_physical_exprs(args, logical_schema, execution_props)?;
let partition_by =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ async fn window_using_aggregates() -> Result<()> {
vec![col("c3")],
);

Expr::WindowFunction(w)
Expr::from(w)
.null_treatment(NullTreatment::IgnoreNulls)
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
.window_frame(WindowFrame::new_bounds(
Expand Down Expand Up @@ -2519,7 +2519,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
let df_results = ctx
.table("t1")
.await?
.select(vec![Expr::WindowFunction(WindowFunction::new(
.select(vec![Expr::from(WindowFunction::new(
WindowFunctionDefinition::AggregateUDF(count_udaf()),
vec![wildcard()],
))
Expand Down
116 changes: 85 additions & 31 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::utils::expr_to_columns;
use crate::Volatility;
use crate::{udaf, ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};

use crate::function::WindowFunctionSimplification;
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable};
use datafusion_common::tree_node::{
Expand Down Expand Up @@ -297,7 +298,7 @@ pub enum Expr {
/// [`ExprFunctionExt`]: crate::expr_fn::ExprFunctionExt
AggregateFunction(AggregateFunction),
/// Represents the call of a window function with arguments.
WindowFunction(WindowFunction),
WindowFunction(Box<WindowFunction>), // Boxed as it is large (272 bytes)
/// Returns whether the list contains the expr value.
InList(InList),
/// EXISTS subquery
Expand Down Expand Up @@ -341,6 +342,13 @@ impl From<Column> for Expr {
}
}

/// Create an [`Expr`] from a [`WindowFunction`]
impl From<WindowFunction> for Expr {
fn from(value: WindowFunction) -> Self {
Expr::WindowFunction(Box::new(value))
}
}

/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
/// useful for creating [`Expr`] from a [`DFSchema`].
///
Expand Down Expand Up @@ -774,6 +782,16 @@ impl WindowFunctionDefinition {
WindowFunctionDefinition::AggregateUDF(fun) => fun.name(),
}
}

/// Return the the inner window simplification function, if any
///
/// See [`WindowFunctionSimplification`] for more information
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
match self {
WindowFunctionDefinition::AggregateUDF(_) => None,
WindowFunctionDefinition::WindowUDF(udwf) => udwf.simplify(),
}
}
}

impl Display for WindowFunctionDefinition {
Expand Down Expand Up @@ -838,6 +856,23 @@ impl WindowFunction {
null_treatment: None,
}
}

/// return the partition by expressions
pub fn partition_by(&self) -> &Vec<Expr> {
&self.partition_by
}

/// return the order by expressions
pub fn order_by(&self) -> &Vec<Sort> {
&self.order_by
}

/// Return the the inner window simplification function, if any
///
/// See [`WindowFunctionSimplification`] for more information
pub fn simplify(&self) -> Option<WindowFunctionSimplification> {
self.fun.simplify()
}
}

/// EXISTS expression
Expand Down Expand Up @@ -1893,24 +1928,24 @@ impl NormalizeEq for Expr {
_ => false,
}
}
(
Expr::WindowFunction(WindowFunction {
(Expr::WindowFunction(left), Expr::WindowFunction(right)) => {
let WindowFunction {
fun: self_fun,
args: self_args,
partition_by: self_partition_by,
order_by: self_order_by,
window_frame: self_window_frame,
null_treatment: self_null_treatment,
}),
Expr::WindowFunction(WindowFunction {
} = left.as_ref();
let WindowFunction {
fun: other_fun,
args: other_args,
partition_by: other_partition_by,
order_by: other_order_by,
window_frame: other_window_frame,
null_treatment: other_null_treatment,
}),
) => {
} = right.as_ref();

self_fun.name() == other_fun.name()
&& self_window_frame == other_window_frame
&& self_null_treatment == other_null_treatment
Expand Down Expand Up @@ -2150,14 +2185,15 @@ impl HashNode for Expr {
distinct.hash(state);
null_treatment.hash(state);
}
Expr::WindowFunction(WindowFunction {
fun,
args: _args,
partition_by: _partition_by,
order_by: _order_by,
window_frame,
null_treatment,
}) => {
Expr::WindowFunction(window_func) => {
let WindowFunction {
fun,
args: _args,
partition_by: _partition_by,
order_by: _order_by,
window_frame,
null_treatment,
} = window_func.as_ref();
fun.hash(state);
window_frame.hash(state);
null_treatment.hash(state);
Expand Down Expand Up @@ -2458,14 +2494,15 @@ impl Display for SchemaDisplay<'_> {

Ok(())
}
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
}) => {
Expr::WindowFunction(window_func) => {
let WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
} = window_func.as_ref();
write!(
f,
"{}({})",
Expand Down Expand Up @@ -2612,14 +2649,16 @@ impl Display for Expr {
// Expr::ScalarFunction(ScalarFunction { func, args }) => {
// write!(f, "{}", func.display_name(args).unwrap())
// }
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
}) => {
Expr::WindowFunction(window_func) => {
let WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
null_treatment,
} = window_func.as_ref();

fmt_function(f, &fun.to_string(), false, args, true)?;

if let Some(nt) = null_treatment {
Expand Down Expand Up @@ -3067,4 +3106,19 @@ mod test {
rename: opt_rename,
}
}

#[test]
fn test_size_of_expr() {
// because Expr is such a widely used struct in DataFusion
// it is important to keep its size as small as possible
//
// If this test fails when you change `Expr`, please try
// `Box`ing the fields to make `Expr` smaller
// See https://github.com/apache/datafusion/issues/14256 for details
assert_eq!(size_of::<Expr>(), 112);
assert_eq!(size_of::<ScalarValue>(), 64);
assert_eq!(size_of::<DataType>(), 24); // 3 ptrs
assert_eq!(size_of::<Vec<Expr>>(), 24);
assert_eq!(size_of::<Arc<Expr>>(), 8);
}
}
10 changes: 5 additions & 5 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ impl ExprFuncBuilder {
udwf.window_frame =
window_frame.unwrap_or(WindowFrame::new(has_order_by));
udwf.null_treatment = null_treatment;
Expr::WindowFunction(udwf)
Expr::from(udwf)
}
};

Expand Down Expand Up @@ -897,7 +897,7 @@ impl ExprFunctionExt for Expr {
ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
}
Expr::WindowFunction(udwf) => {
ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
}
_ => ExprFuncBuilder::new(None),
};
Expand Down Expand Up @@ -937,7 +937,7 @@ impl ExprFunctionExt for Expr {
ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
}
Expr::WindowFunction(udwf) => {
ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
}
_ => ExprFuncBuilder::new(None),
};
Expand All @@ -950,7 +950,7 @@ impl ExprFunctionExt for Expr {
fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
match self {
Expr::WindowFunction(udwf) => {
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
builder.partition_by = Some(partition_by);
builder
}
Expand All @@ -961,7 +961,7 @@ impl ExprFunctionExt for Expr {
fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder {
match self {
Expr::WindowFunction(udwf) => {
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
builder.window_frame = Some(window_frame);
builder
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub type AggregateFunctionSimplification = Box<
>;

/// [crate::udwf::WindowUDFImpl::simplify] simplifier closure
///
/// A closure with two arguments:
/// * 'window_function': [crate::expr::WindowFunction] for which simplified has been invoked
/// * 'info': [crate::simplify::SimplifyInfo]
Expand Down
23 changes: 14 additions & 9 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2420,19 +2420,24 @@ impl Window {
.iter()
.enumerate()
.filter_map(|(idx, expr)| {
if let Expr::WindowFunction(WindowFunction {
let Expr::WindowFunction(window_func) = expr else {
return None;
};
let WindowFunction {
fun: WindowFunctionDefinition::WindowUDF(udwf),
partition_by,
..
}) = expr
{
// When there is no PARTITION BY, row number will be unique
// across the entire table.
if udwf.name() == "row_number" && partition_by.is_empty() {
return Some(idx + input_len);
}
} = window_func.as_ref()
else {
return None;
};
// When there is no PARTITION BY, row number will be unique
// across the entire table.
if udwf.name() == "row_number" && partition_by.is_empty() {
Some(idx + input_len)
} else {
None
}
None
})
.map(|idx| {
FunctionalDependence::new(vec![idx], vec![], false)
Expand Down
Loading