Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 2 additions & 38 deletions datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl AggregateExprBuilder {

let return_field = fun.return_field(&input_exprs_fields)?;
let is_nullable = fun.is_nullable();
// TODO rename AggregateExprBuilder::alias to name
let name = match alias {
None => {
return internal_err!(
Expand Down Expand Up @@ -575,18 +576,10 @@ impl AggregateFunctionExpr {
ReversedUDAF::NotSupported => None,
ReversedUDAF::Identical => Some(self.clone()),
ReversedUDAF::Reversed(reverse_udf) => {
let mut name = self.name().to_string();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this may have other unintended effects. I will request some more eyes on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!
I agree this code was deliberate & nice. I hope we don't parse those names though.
If there is a better solution to agg reverse causing failures (#16625 (comment)), let me know. I can also drop this fix, I don't like it too.

Alternatively to the fix, I can block reversing for beneficial functions and thus hide the problem for now. Would it be preferred for this PR?

// If the function is changed, we need to reverse order_by clause as well
// i.e. First(a order by b asc null first) -> Last(a order by b desc null last)
if self.fun().name() != reverse_udf.name() {
replace_order_by_clause(&mut name);
}
replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name());

AggregateExprBuilder::new(reverse_udf, self.args.to_vec())
.order_by(self.order_bys.iter().map(|e| e.reverse()).collect())
.schema(Arc::new(self.schema.clone()))
.alias(name)
.alias(self.name())
.with_ignore_nulls(self.ignore_nulls)
.with_distinct(self.is_distinct)
.with_reversed(!self.is_reversed)
Expand Down Expand Up @@ -684,32 +677,3 @@ impl PartialEq for AggregateFunctionExpr {
.all(|(this_arg, other_arg)| this_arg.eq(other_arg))
}
}

fn replace_order_by_clause(order_by: &mut String) {
let suffixes = [
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
];

if let Some(start) = order_by.find("ORDER BY [") {
if let Some(end) = order_by[start..].find(']') {
let order_by_start = start + 9;
let order_by_end = start + end;

let column_order = &order_by[order_by_start..=order_by_end];
for (suffix, replacement) in suffixes {
if column_order.ends_with(suffix) {
let new_order = column_order.replace(suffix, replacement);
order_by.replace_range(order_by_start..=order_by_end, &new_order);
break;
}
}
}
}
}

fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) {
*aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
}
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6354,9 +6354,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
02)--TableScan: convert_first_last_table projection=[c1, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this makes it look like the plan is wrong:

  1. the query calls for first_value(c1 ORDER BY c3 desc) but the table is sorted by c3 ASC

I think internally the optimizer has rewritten first_value(c1 ORDER BY c3 desc) to last_value(c1 ORDER BY c3 ASC)

However this plan makes it look like that didn't happen

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was renaming to address this, but the renaming did not exactly work -- #16625 (comment).

04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], file_type=csv, has_header=true

Expand All @@ -6368,9 +6368,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
02)--TableScan: convert_first_last_table projection=[c1, c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], file_type=csv, has_header=true

Expand Down
Loading