-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support multiple ordered array_agg aggregations
#16625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Due to `..` in the pattern, the `OrderSensitiveArrayAggAccumulator::merge_batch` did not validate it's not receiving additional states columns it ignores. Update the code to check number of inputs.
Before the change, `array_agg` with ordering would depend on input being ordered. As a result, it was impossible to do two or more `array_agg(x ORDER BY ...)` with incompatible ordering. This change moves ordering responsibility into `OrderSensitiveArrayAggAccumulator`. When input is pre-ordered (beneficial ordering), no additional work is done. However, when it's not, `array_agg` accumulator will order the data on its own.
1264858 to
cf4d8ae
Compare
ozankabak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments:
FIRST_VALUEandLAST_VALUEimplementations userequirement_satisfiedandwith_requirement_satisfiednames for this thing, IMO it would be a good idea to follow suit here for consistency/readability purposes.- There are some changes where the planner was able to get away with appending a key to an already-existing sort, and operate the accumulator in an efficient mode. This PR loses that. Good news is that we already have the mechanism to address this in place (i.e.
OrderingRequirements) and the solution is not hard. Aggregations involving functions that benefit from existing ordering should returnOrderingRequirements::Soft, and theenforce_sortingrule should leverage any already existing sort (if present).
|
thanks for your review @ozankabak
will align for array_agg i am ll for consistency. do you think we could update
Yes, i noticed the PR drops some redundant sorts, which is likely quite a big improvement for aggregations with GROUP BY. Am i reading this correctly? I agree that in some rare situations those sorts are still the optimal way to go. However, I don't see a way for UDAF to declare |
Sorry, with |
I don't think it matters and I think the name was already kind of obvious, but since you already opened a PR for it I went ahead and approved it. Regarding your question about plan changes, I don't think we have enough information to call it scope creep -- the PR adds a new capability that we were working towards for a while, but also loses something that already exists. That was actually the main reason why we didn't take the plunge just yet as we worked on this over the past few months. If getting this over the finish line in the short term is important to you, I think a reasonable step forward is to take a look at what solving it entails: There are basically two steps: (1) I think the solution will ultimately be small in terms of LOC changes, but the second step will require some thinking. If making an attempt at solving reveals that the problem has challenges that we don't foresee now, we can reconsider whether we want to accept the plan changes and open an issue to track the work that I described in the above paragraph. Thanks |
|
I think global array_agg is not a very interesting scenario and a grouped array_agg no longer requires global sorting. |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @findepi -- the idea makes sense to me
My only concern is switching to use sort of ScalarValue rather than arrays (in SortExec) as it might be quite slower
There is a current related outstanding PR from @sfluor :
I think @rluvaton is familar with ths code too. Perhaps he has some time to review as well?
| 04)------SortExec: expr=[c2@1 DESC, c3@2 ASC NULLS LAST], preserve_partitioning=[true] | ||
| 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
| 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], file_type=csv, has_header=true | ||
| 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should benchmark this -- sorting by ScalarValue is likely a lot less efficient than using the fast array sort / etc that is done in SortExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for global aggregation -- agreed. but then, a global array_agg aggregation cannot feasible operate on large amounts of data, can it? (or rather: it can, but that's unlikely a common scenario)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb i pushed some changes following @ozankabak 's suggestions and the plans have (expectedly) changed. Can you take another look?
The sorting consideration before aggregations did respect only ordered aggregation functions with `AggregateOrderSensitivity::HardRequirement`. This change includes sorting expectations from `AggregateOrderSensitivity::Beneficial` functions. When beneficial ordered function requirements are not satisfied, no error is raised, they are considered in the second pass only.
8f2eab7 to
4cd992c
Compare
| DataFusion error: Internal error: Input field name last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] does not match with the projection expression first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]. | ||
| This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unblocking beneficial ordering triggered an error around reversing first_value / last_value.
Fix coming.
Upon reversing, a schema and field mismatch would happen.
| ReversedUDAF::NotSupported => None, | ||
| ReversedUDAF::Identical => Some(self.clone()), | ||
| ReversedUDAF::Reversed(reverse_udf) => { | ||
| let mut name = self.name().to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think removing this may have other unintended effects. I will request some more eyes on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I agree this code was deliberate & nice. I hope we don't parse those names though.
If there is a better solution to agg reverse causing failures (#16625 (comment)), let me know. I can also drop this fix, I don't like it too.
Alternatively to the fix, I can block reversing for beneficial functions and thus hide the problem for now. Would it be preferred for this PR?
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another quick look --- thank you @findepi
I am worried about the unintended consequences of this change (mostly because I don't understand the code well enough to know what the invariants are / if we are breaking them).
Some of the plan changes definitely look wrong to me -- I am not sure if it is just a column naming thing or øf the expressions are actually wrong now 🤔
| physical_plan | ||
| 01)ProjectionExec: expr=[country@0 as country, array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] | ||
| 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] | ||
| 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], first_value(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], last_value(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that certainly seems like an improvement
| 01)ProjectionExec: expr=[country@0 as country, first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] | ||
| 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[first_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], last_value(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], sum(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] | ||
| 03)----DataSourceExec: partitions=1, partition_sizes=[1] | ||
| 03)----SortExec: expr=[ts@1 DESC], preserve_partitioning=[false] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird -- the comments say this plan should have a SortExec but the plan that is checked in does not have one
| 01)AggregateExec: mode=Final, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]] | ||
| 02)--CoalescePartitionsExec | ||
| 03)----AggregateExec: mode=Partial, gby=[], aggr=[last_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]] | ||
| 03)----AggregateExec: mode=Partial, gby=[], aggr=[first_value(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this makes it look like the plan is wrong:
- the query calls for
first_value(c1 ORDER BY c3 desc)but the table is sorted byc3 ASC
I think internally the optimizer has rewritten first_value(c1 ORDER BY c3 desc) to last_value(c1 ORDER BY c3 ASC)
However this plan makes it look like that didn't happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was renaming to address this, but the renaming did not exactly work -- #16625 (comment).
|
Agreed, I think this will need some time to brew. As I said previously, I hope to get some more eyes on this in the short term (maybe early next week) |
|
I think we will find a solution that avoids this redundancy. Expect some feedback from me (or someone on my team) in a few days |
|
@ozankabak @alamb can you please help me understand where you would want to go with this? or maybe DF doesn't need to support ordered array_aggs (more than one in a query)? |
I think supporting multiple ordered array_agg aggregations makes sense to me; I have not had a chance to review this PR recently. Is it ready for another review? |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pushing this forward @findepi
I personally think supporting multiple sorted aggregates is a useful feature and we should work on it. My only real concern with the code currently in this PR is that it may breaking the FFI API (which is ideally supposed to be stable)
I re-read the comments on this PR and I wonder if you tried implementing the solution suggested by @ozankabak in #16625 (comment):
If getting this over the finish line in the short term is important to you, I think a reasonable step forward is to take a look at what solving it entails: There are basically two steps: (1) AggregateExec needs to consult the UDAF definition as it forms its required input ordering (this is the easy step), (2) The enforce sorting rule needs to address the case when there is already a sort with a prefix of a soft requirement, and just extend the sort keys (this is the harder step).
I think the solution will ultimately be small in terms of LOC changes, but the second step will require some thinking.
This PR seems similar except that it adds the SoftRequirement stage as well. If we could avoid the need for SoftRequirement I think this PR would be pretty great
| pub enum FFI_AggregateOrderSensitivity { | ||
| Insensitive, | ||
| HardRequirement, | ||
| SoftRequirement, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically speaking this is an FFI API change -- I am not sure what implication that has (note this would not be released until DataFusion 50 anyways).
cc @timsaucer -- I wonder if we should gather up the FFI breaking changes into their own PR / more carefully schedule such breakages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new thing doesn't need to be supported in the FFI.
However, i didn't know how to avoid adding this.
When looking at impl From<AggregateOrderSensitivity> for FFI_AggregateOrderSensitivity i am under impression that this particular part of FFI API is tightly coupled with the datafusion core, so in this particular place it cannot deliver API stability without inhibiting datafusion core progress. The necessary solution might be replacing this From with TryFrom, same with (impl From<FFI_AggregateOrderSensitivity> for AggregateOrderSensitivity).
My understanding is that, by tightly coupling AggregateOrderSensitivity and FFI_AggregateOrderSensitivity code author chose to let these enums naturally evolve over time, considering this not a breaking change, or an acceptable breaking change.
#16625 (comment) I am not convinced it's actually desired to make the existing Consider first_value / top_1 function, which is O(n). It benefits from the input being sorted, becoming O(1) in such case. It does not, however, want to impose input sorting, as that would be O(n log n). One can argue that sorting for "first_value order by a, b" can be added only if there already is some sorting on a. It's not a bad argument, but note that within the group of least a value, it's still O(group size) -> O(group size * log group size) change. Thus, it seems optimal to be able to distinguish functions that
Thus it makes sense for the |
I see -- my confusion stemmed from that I understand the theoretical difference between
What I think I am confused about is what is the practical difference between I believe the result is that DataFusion will attempt to sort the input according to the requirement, but if it can not (because it will cause a conflict with another aggregate function's requirements, for example) then the aggregate can still be run with the different ordering |
array_agg aggregations
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this PR carefully and it makes sense to me -- thank you @findepi
I still don't really understand the implications of the FFI change, but maybe they are ok.
I also kicked off a benchmark for array_agg to ensure this PR doesn't introduce regressions, and as long as that is good I think this PR is good to merge
|
🤖: Benchmark completed Details
|
|
I didn't have time to dig deeper on this, so we can go ahead with the merge. We can unify |
IN my opinion, the benchmark results show no meaningful difference |
|
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
|
Thank you @alamb for review and @ozankabak for feedback. |
|
Onwards. Thanks @findepi and @ozankabak |
* Validate states shape in merge_batch Due to `..` in the pattern, the `OrderSensitiveArrayAggAccumulator::merge_batch` did not validate it's not receiving additional states columns it ignores. Update the code to check number of inputs. * Support multiple ordered array_agg Before the change, `array_agg` with ordering would depend on input being ordered. As a result, it was impossible to do two or more `array_agg(x ORDER BY ...)` with incompatible ordering. This change moves ordering responsibility into `OrderSensitiveArrayAggAccumulator`. When input is pre-ordered (beneficial ordering), no additional work is done. However, when it's not, `array_agg` accumulator will order the data on its own. * Generate sorts based on aggregations soft requirements The sorting consideration before aggregations did respect only ordered aggregation functions with `AggregateOrderSensitivity::HardRequirement`. This change includes sorting expectations from `AggregateOrderSensitivity::Beneficial` functions. When beneficial ordered function requirements are not satisfied, no error is raised, they are considered in the second pass only. * Fix reversing first_value, last_value Upon reversing, a schema and field mismatch would happen. * Revert "Fix reversing first_value, last_value" This reverts commit 9b7e94d. * sort array_agg input the old way whenever possible * revert some now unnecessary change * Improve doc for SoftRequiement Co-authored-by: Andrew Lamb <[email protected]> * Add comment for include_soft_requirement Co-authored-by: Andrew Lamb <[email protected]> * Document include_soft_requirement param * fmt * doc fix --------- Co-authored-by: Andrew Lamb <[email protected]>
| /// ARRAY_AGG aggregate expression | ||
| pub struct ArrayAgg { | ||
| signature: Signature, | ||
| is_input_pre_ordered: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this new field should trigger adding equals/hash_value implementations.
Being fixed in #17065
Which issue does this PR close?
None. #8582 is related.
Rationale for this change
Before the change,
array_aggwith ordering would depend on input beingordered. As a result, it was impossible to do two or more
array_agg(x ORDER BY ...)with incompatible ordering.What changes are included in this PR?
This change moves ordering
responsibility into
OrderSensitiveArrayAggAccumulator. When input ispre-ordered (beneficial ordering), no additional work is done. However,
when it's not,
array_aggaccumulator will order the data on its own.Are these changes tested?
Yes
Are there any user-facing changes?
Yes