Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 78 additions & 52 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
};

use datafusion_expr::utils::AggregateOrderSensitivity;
use itertools::Itertools;

pub(crate) mod group_values;
Expand Down Expand Up @@ -1071,13 +1072,25 @@ fn get_aggregate_expr_req(
aggr_expr: &AggregateFunctionExpr,
group_by: &PhysicalGroupBy,
agg_mode: &AggregateMode,
include_soft_requirement: bool,
) -> Option<LexOrdering> {
// If the aggregation function is ordering requirement is not absolutely
// necessary, or the aggregation is performing a "second stage" calculation,
// then ignore the ordering requirement.
if !aggr_expr.order_sensitivity().hard_requires() || !agg_mode.is_first_stage() {
// If the aggregation is performing a "second stage" calculation,
// then ignore the ordering requirement. Ordering requirement applies
// only to the aggregation input data.
if !agg_mode.is_first_stage() {
return None;
}

match aggr_expr.order_sensitivity() {
AggregateOrderSensitivity::Insensitive => return None,
AggregateOrderSensitivity::HardRequirement => {}
AggregateOrderSensitivity::Beneficial => {
if !include_soft_requirement {
return None;
}
}
}

let mut sort_exprs = aggr_expr.order_bys().to_vec();
// In non-first stage modes, we accumulate data (using `merge_batch`) from
// different partitions (i.e. merge partial results). During this merge, we
Expand Down Expand Up @@ -1142,60 +1155,73 @@ pub fn get_finer_aggregate_exprs_requirement(
agg_mode: &AggregateMode,
) -> Result<Vec<PhysicalSortRequirement>> {
let mut requirement = None;
for aggr_expr in aggr_exprs.iter_mut() {
let Some(aggr_req) = get_aggregate_expr_req(aggr_expr, group_by, agg_mode)
.and_then(|o| eq_properties.normalize_sort_exprs(o))
else {
// There is no aggregate ordering requirement, or it is trivially
// satisfied -- we can skip this expression.
continue;
};
// If the common requirement is finer than the current expression's,
// we can skip this expression. If the latter is finer than the former,
// adopt it if it is satisfied by the equivalence properties. Otherwise,
// defer the analysis to the reverse expression.
let forward_finer = determine_finer(&requirement, &aggr_req);
if let Some(finer) = forward_finer {
if !finer {
continue;
} else if eq_properties.ordering_satisfy(aggr_req.clone())? {
requirement = Some(aggr_req);
continue;
}
}
if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() {
let Some(rev_aggr_req) =
get_aggregate_expr_req(&reverse_aggr_expr, group_by, agg_mode)
.and_then(|o| eq_properties.normalize_sort_exprs(o))
else {
// The reverse requirement is trivially satisfied -- just reverse
// the expression and continue with the next one:
*aggr_expr = Arc::new(reverse_aggr_expr);

for include_soft_requirement in [false, true] {
for aggr_expr in aggr_exprs.iter_mut() {
let Some(aggr_req) = get_aggregate_expr_req(
aggr_expr,
group_by,
agg_mode,
include_soft_requirement,
)
.and_then(|o| eq_properties.normalize_sort_exprs(o)) else {
// There is no aggregate ordering requirement, or it is trivially
// satisfied -- we can skip this expression.
continue;
};
// If the common requirement is finer than the reverse expression's,
// just reverse it and continue the loop with the next aggregate
// expression. If the latter is finer than the former, adopt it if
// it is satisfied by the equivalence properties. Otherwise, adopt
// the forward expression.
if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) {
// If the common requirement is finer than the current expression's,
// we can skip this expression. If the latter is finer than the former,
// adopt it if it is satisfied by the equivalence properties. Otherwise,
// defer the analysis to the reverse expression.
let forward_finer = determine_finer(&requirement, &aggr_req);
if let Some(finer) = forward_finer {
if !finer {
continue;
} else if eq_properties.ordering_satisfy(aggr_req.clone())? {
requirement = Some(aggr_req);
continue;
}
}
if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() {
let Some(rev_aggr_req) = get_aggregate_expr_req(
&reverse_aggr_expr,
group_by,
agg_mode,
include_soft_requirement,
)
.and_then(|o| eq_properties.normalize_sort_exprs(o)) else {
// The reverse requirement is trivially satisfied -- just reverse
// the expression and continue with the next one:
*aggr_expr = Arc::new(reverse_aggr_expr);
} else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? {
*aggr_expr = Arc::new(reverse_aggr_expr);
requirement = Some(rev_aggr_req);
} else {
continue;
};
// If the common requirement is finer than the reverse expression's,
// just reverse it and continue the loop with the next aggregate
// expression. If the latter is finer than the former, adopt it if
// it is satisfied by the equivalence properties. Otherwise, adopt
// the forward expression.
if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) {
if !finer {
*aggr_expr = Arc::new(reverse_aggr_expr);
} else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? {
*aggr_expr = Arc::new(reverse_aggr_expr);
requirement = Some(rev_aggr_req);
} else {
requirement = Some(aggr_req);
}
} else if forward_finer.is_some() {
requirement = Some(aggr_req);
} else {
// Neither the existing requirement nor the current aggregate
// requirement satisfy the other (forward or reverse), this
// means they are conflicting. This is a problem only for hard
// requirements. Unsatisfied soft requirements can be ignored.
if !include_soft_requirement {
return not_impl_err!(
"Conflicting ordering requirements in aggregate functions is not supported"
);
}
}
} else if forward_finer.is_some() {
requirement = Some(aggr_req);
} else {
// Neither the existing requirement nor the current aggregate
// requirement satisfy the other (forward or reverse), this
// means they are conflicting.
return not_impl_err!(
"Conflicting ordering requirements in aggregate functions is not supported"
);
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
04)------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true

# test array_agg_order with list data type
statement ok
Expand Down Expand Up @@ -6353,7 +6354,7 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
02)--TableScan: convert_first_last_table projection=[c1, c3]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
Expand All @@ -6367,7 +6368,7 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
02)--TableScan: convert_first_last_table projection=[c1, c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
Expand Down
5 changes: 3 additions & 2 deletions datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
08)--------------SortExec: expr=[c3@2 ASC NULLS LAST], preserve_partitioning=[true]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true

# ON expressions are not a sub-set of the ORDER BY expressions
query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions
Expand Down
Loading
Loading