Skip to content

Commit 4cd992c

Browse files
committed
Generate sorts based on aggregations soft requirements
The sorting consideration before aggregations did respect only ordered aggregation functions with `AggregateOrderSensitivity::HardRequirement`. This change includes sorting expectations from `AggregateOrderSensitivity::Beneficial` functions. When beneficial ordered function requirements are not satisfied, no error is raised, they are considered in the second pass only.
1 parent cf4d8ae commit 4cd992c

File tree

6 files changed

+184
-162
lines changed

6 files changed

+184
-162
lines changed

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
5252
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortRequirement,
5353
};
5454

55+
use datafusion_expr::utils::AggregateOrderSensitivity;
5556
use itertools::Itertools;
5657

5758
pub(crate) mod group_values;
@@ -1071,13 +1072,25 @@ fn get_aggregate_expr_req(
10711072
aggr_expr: &AggregateFunctionExpr,
10721073
group_by: &PhysicalGroupBy,
10731074
agg_mode: &AggregateMode,
1075+
include_soft_requirement: bool,
10741076
) -> Option<LexOrdering> {
1075-
// If the aggregation function is ordering requirement is not absolutely
1076-
// necessary, or the aggregation is performing a "second stage" calculation,
1077-
// then ignore the ordering requirement.
1078-
if !aggr_expr.order_sensitivity().hard_requires() || !agg_mode.is_first_stage() {
1077+
// If the aggregation is performing a "second stage" calculation,
1078+
// then ignore the ordering requirement. Ordering requirement applies
1079+
// only to the aggregation input data.
1080+
if !agg_mode.is_first_stage() {
10791081
return None;
10801082
}
1083+
1084+
match aggr_expr.order_sensitivity() {
1085+
AggregateOrderSensitivity::Insensitive => return None,
1086+
AggregateOrderSensitivity::HardRequirement => {}
1087+
AggregateOrderSensitivity::Beneficial => {
1088+
if !include_soft_requirement {
1089+
return None;
1090+
}
1091+
}
1092+
}
1093+
10811094
let mut sort_exprs = aggr_expr.order_bys().to_vec();
10821095
// In non-first stage modes, we accumulate data (using `merge_batch`) from
10831096
// different partitions (i.e. merge partial results). During this merge, we
@@ -1142,60 +1155,73 @@ pub fn get_finer_aggregate_exprs_requirement(
11421155
agg_mode: &AggregateMode,
11431156
) -> Result<Vec<PhysicalSortRequirement>> {
11441157
let mut requirement = None;
1145-
for aggr_expr in aggr_exprs.iter_mut() {
1146-
let Some(aggr_req) = get_aggregate_expr_req(aggr_expr, group_by, agg_mode)
1147-
.and_then(|o| eq_properties.normalize_sort_exprs(o))
1148-
else {
1149-
// There is no aggregate ordering requirement, or it is trivially
1150-
// satisfied -- we can skip this expression.
1151-
continue;
1152-
};
1153-
// If the common requirement is finer than the current expression's,
1154-
// we can skip this expression. If the latter is finer than the former,
1155-
// adopt it if it is satisfied by the equivalence properties. Otherwise,
1156-
// defer the analysis to the reverse expression.
1157-
let forward_finer = determine_finer(&requirement, &aggr_req);
1158-
if let Some(finer) = forward_finer {
1159-
if !finer {
1160-
continue;
1161-
} else if eq_properties.ordering_satisfy(aggr_req.clone())? {
1162-
requirement = Some(aggr_req);
1163-
continue;
1164-
}
1165-
}
1166-
if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() {
1167-
let Some(rev_aggr_req) =
1168-
get_aggregate_expr_req(&reverse_aggr_expr, group_by, agg_mode)
1169-
.and_then(|o| eq_properties.normalize_sort_exprs(o))
1170-
else {
1171-
// The reverse requirement is trivially satisfied -- just reverse
1172-
// the expression and continue with the next one:
1173-
*aggr_expr = Arc::new(reverse_aggr_expr);
1158+
1159+
for include_soft_requirement in [false, true] {
1160+
for aggr_expr in aggr_exprs.iter_mut() {
1161+
let Some(aggr_req) = get_aggregate_expr_req(
1162+
aggr_expr,
1163+
group_by,
1164+
agg_mode,
1165+
include_soft_requirement,
1166+
)
1167+
.and_then(|o| eq_properties.normalize_sort_exprs(o)) else {
1168+
// There is no aggregate ordering requirement, or it is trivially
1169+
// satisfied -- we can skip this expression.
11741170
continue;
11751171
};
1176-
// If the common requirement is finer than the reverse expression's,
1177-
// just reverse it and continue the loop with the next aggregate
1178-
// expression. If the latter is finer than the former, adopt it if
1179-
// it is satisfied by the equivalence properties. Otherwise, adopt
1180-
// the forward expression.
1181-
if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) {
1172+
// If the common requirement is finer than the current expression's,
1173+
// we can skip this expression. If the latter is finer than the former,
1174+
// adopt it if it is satisfied by the equivalence properties. Otherwise,
1175+
// defer the analysis to the reverse expression.
1176+
let forward_finer = determine_finer(&requirement, &aggr_req);
1177+
if let Some(finer) = forward_finer {
11821178
if !finer {
1179+
continue;
1180+
} else if eq_properties.ordering_satisfy(aggr_req.clone())? {
1181+
requirement = Some(aggr_req);
1182+
continue;
1183+
}
1184+
}
1185+
if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() {
1186+
let Some(rev_aggr_req) = get_aggregate_expr_req(
1187+
&reverse_aggr_expr,
1188+
group_by,
1189+
agg_mode,
1190+
include_soft_requirement,
1191+
)
1192+
.and_then(|o| eq_properties.normalize_sort_exprs(o)) else {
1193+
// The reverse requirement is trivially satisfied -- just reverse
1194+
// the expression and continue with the next one:
11831195
*aggr_expr = Arc::new(reverse_aggr_expr);
1184-
} else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? {
1185-
*aggr_expr = Arc::new(reverse_aggr_expr);
1186-
requirement = Some(rev_aggr_req);
1187-
} else {
1196+
continue;
1197+
};
1198+
// If the common requirement is finer than the reverse expression's,
1199+
// just reverse it and continue the loop with the next aggregate
1200+
// expression. If the latter is finer than the former, adopt it if
1201+
// it is satisfied by the equivalence properties. Otherwise, adopt
1202+
// the forward expression.
1203+
if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) {
1204+
if !finer {
1205+
*aggr_expr = Arc::new(reverse_aggr_expr);
1206+
} else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? {
1207+
*aggr_expr = Arc::new(reverse_aggr_expr);
1208+
requirement = Some(rev_aggr_req);
1209+
} else {
1210+
requirement = Some(aggr_req);
1211+
}
1212+
} else if forward_finer.is_some() {
11881213
requirement = Some(aggr_req);
1214+
} else {
1215+
// Neither the existing requirement nor the current aggregate
1216+
// requirement satisfy the other (forward or reverse), this
1217+
// means they are conflicting. This is a problem only for hard
1218+
// requirements. Unsatisfied soft requirements can be ignored.
1219+
if !include_soft_requirement {
1220+
return not_impl_err!(
1221+
"Conflicting ordering requirements in aggregate functions is not supported"
1222+
);
1223+
}
11891224
}
1190-
} else if forward_finer.is_some() {
1191-
requirement = Some(aggr_req);
1192-
} else {
1193-
// Neither the existing requirement nor the current aggregate
1194-
// requirement satisfy the other (forward or reverse), this
1195-
// means they are conflicting.
1196-
return not_impl_err!(
1197-
"Conflicting ordering requirements in aggregate functions is not supported"
1198-
);
11991225
}
12001226
}
12011227
}

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,9 @@ physical_plan
282282
01)AggregateExec: mode=Final, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
283283
02)--CoalescePartitionsExec
284284
03)----AggregateExec: mode=Partial, gby=[], aggr=[array_agg(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
285-
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
286-
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
285+
04)------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true]
286+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
287+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
287288

288289
# test array_agg_order with list data type
289290
statement ok
@@ -6353,7 +6354,7 @@ logical_plan
63536354
01)Aggregate: groupBy=[[]], aggr=[[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
63546355
02)--TableScan: convert_first_last_table projection=[c1, c3]
63556356
physical_plan
6356-
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
6357+
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
63576358
02)--CoalescePartitionsExec
63586359
03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
63596360
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
@@ -6367,7 +6368,7 @@ logical_plan
63676368
01)Aggregate: groupBy=[[]], aggr=[[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
63686369
02)--TableScan: convert_first_last_table projection=[c1, c2]
63696370
physical_plan
6370-
01)AggregateExec: mode=Final, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
6371+
01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
63716372
02)--CoalescePartitionsExec
63726373
03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
63736374
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1

datafusion/sqllogictest/test_files/distinct_on.slt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ physical_plan
101101
05)--------CoalesceBatchesExec: target_batch_size=8192
102102
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
103103
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[first_value(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], first_value(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
104-
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
105-
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
104+
08)--------------SortExec: expr=[c3@2 ASC NULLS LAST], preserve_partitioning=[true]
105+
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
106+
10)------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true
106107

107108
# ON expressions are not a sub-set of the ORDER BY expressions
108109
query error SELECT DISTINCT ON expressions must match initial ORDER BY expressions

0 commit comments

Comments
 (0)