-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support multiple ordered array_agg aggregations
#16625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
c32eb2f
Validate states shape in merge_batch
findepi cf4d8ae
Support multiple ordered array_agg
findepi 4cd992c
Generate sorts based on aggregations soft requirements
findepi 9b7e94d
Fix reversing first_value, last_value
findepi a551e7d
Merge remote-tracking branch 'upstream/main' into findepi/two-ordered…
findepi 3bacde9
Revert "Fix reversing first_value, last_value"
findepi 90db3d2
sort array_agg input the old way whenever possible
findepi 5f00ec4
revert some now unnecessary change
findepi 134da5a
Improve doc for SoftRequiement
findepi 1420f8d
Add comment for include_soft_requirement
findepi 5c1bce9
Document include_soft_requirement param
findepi 8a1abe8
fmt
findepi a1031e0
doc fix
findepi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
|
|
||
| use std::cmp::Ordering; | ||
| use std::collections::{HashSet, VecDeque}; | ||
| use std::mem::{size_of, size_of_val}; | ||
| use std::mem::{size_of, size_of_val, take}; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::{ | ||
|
|
@@ -31,14 +31,17 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields}; | |
|
|
||
| use datafusion_common::cast::as_list_array; | ||
| use datafusion_common::scalar::copy_array_data; | ||
| use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; | ||
| use datafusion_common::utils::{ | ||
| compare_rows, get_row_at_idx, take_function_args, SingleRowListArrayBuilder, | ||
| }; | ||
| use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; | ||
| use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; | ||
| use datafusion_expr::utils::format_state_name; | ||
| use datafusion_expr::{ | ||
| Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, | ||
| }; | ||
| use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; | ||
| use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; | ||
| use datafusion_functions_aggregate_common::utils::ordering_fields; | ||
| use datafusion_macros::user_doc; | ||
| use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; | ||
|
|
@@ -78,12 +81,14 @@ This aggregation function can only mix DISTINCT and ORDER BY if the ordering exp | |
| /// ARRAY_AGG aggregate expression | ||
| pub struct ArrayAgg { | ||
| signature: Signature, | ||
| is_input_pre_ordered: bool, | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding this new field should trigger adding equals/hash_value implementations. |
||
| } | ||
|
|
||
| impl Default for ArrayAgg { | ||
| fn default() -> Self { | ||
| Self { | ||
| signature: Signature::any(1, Volatility::Immutable), | ||
| is_input_pre_ordered: false, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -144,6 +149,20 @@ impl AggregateUDFImpl for ArrayAgg { | |
| Ok(fields) | ||
| } | ||
|
|
||
| fn order_sensitivity(&self) -> AggregateOrderSensitivity { | ||
| AggregateOrderSensitivity::SoftRequirement | ||
| } | ||
|
|
||
| fn with_beneficial_ordering( | ||
| self: Arc<Self>, | ||
| beneficial_ordering: bool, | ||
| ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> { | ||
| Ok(Some(Arc::new(Self { | ||
| signature: self.signature.clone(), | ||
| is_input_pre_ordered: beneficial_ordering, | ||
| }))) | ||
| } | ||
|
|
||
| fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { | ||
| let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; | ||
| let ignore_nulls = | ||
|
|
@@ -196,6 +215,7 @@ impl AggregateUDFImpl for ArrayAgg { | |
| &data_type, | ||
| &ordering_dtypes, | ||
| ordering, | ||
| self.is_input_pre_ordered, | ||
| acc_args.is_reversed, | ||
| ignore_nulls, | ||
| ) | ||
|
|
@@ -518,6 +538,8 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator { | |
| datatypes: Vec<DataType>, | ||
| /// Stores the ordering requirement of the `Accumulator`. | ||
| ordering_req: LexOrdering, | ||
| /// Whether the input is known to be pre-ordered | ||
| is_input_pre_ordered: bool, | ||
| /// Whether the aggregation is running in reverse. | ||
| reverse: bool, | ||
| /// Whether the aggregation should ignore null values. | ||
|
|
@@ -531,6 +553,7 @@ impl OrderSensitiveArrayAggAccumulator { | |
| datatype: &DataType, | ||
| ordering_dtypes: &[DataType], | ||
| ordering_req: LexOrdering, | ||
| is_input_pre_ordered: bool, | ||
| reverse: bool, | ||
| ignore_nulls: bool, | ||
| ) -> Result<Self> { | ||
|
|
@@ -541,11 +564,34 @@ impl OrderSensitiveArrayAggAccumulator { | |
| ordering_values: vec![], | ||
| datatypes, | ||
| ordering_req, | ||
| is_input_pre_ordered, | ||
| reverse, | ||
| ignore_nulls, | ||
| }) | ||
| } | ||
|
|
||
| fn sort(&mut self) { | ||
| let sort_options = self | ||
| .ordering_req | ||
| .iter() | ||
| .map(|sort_expr| sort_expr.options) | ||
| .collect::<Vec<_>>(); | ||
| let mut values = take(&mut self.values) | ||
| .into_iter() | ||
| .zip(take(&mut self.ordering_values)) | ||
| .collect::<Vec<_>>(); | ||
| let mut delayed_cmp_err = Ok(()); | ||
| values.sort_by(|(_, left_ordering), (_, right_ordering)| { | ||
| compare_rows(left_ordering, right_ordering, &sort_options).unwrap_or_else( | ||
| |err| { | ||
| delayed_cmp_err = Err(err); | ||
| Ordering::Equal | ||
| }, | ||
| ) | ||
| }); | ||
| (self.values, self.ordering_values) = values.into_iter().unzip(); | ||
| } | ||
|
|
||
| fn evaluate_orderings(&self) -> Result<ScalarValue> { | ||
| let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); | ||
|
|
||
|
|
@@ -616,9 +662,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| // inside `ARRAY_AGG` list, we will receive an `Array` that stores values | ||
| // received from its ordering requirement expression. (This information | ||
| // is necessary for during merging). | ||
| let [array_agg_values, agg_orderings, ..] = &states else { | ||
| return exec_err!("State should have two elements"); | ||
| }; | ||
| let [array_agg_values, agg_orderings] = | ||
| take_function_args("OrderSensitiveArrayAggAccumulator::merge_batch", states)?; | ||
| let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else { | ||
| return exec_err!("Expects to receive a list array"); | ||
| }; | ||
|
|
@@ -629,8 +674,11 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| let mut partition_ordering_values = vec![]; | ||
|
|
||
| // Existing values should be merged also. | ||
| partition_values.push(self.values.clone().into()); | ||
| partition_ordering_values.push(self.ordering_values.clone().into()); | ||
| if !self.is_input_pre_ordered { | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.sort(); | ||
| } | ||
| partition_values.push(take(&mut self.values).into()); | ||
| partition_ordering_values.push(take(&mut self.ordering_values).into()); | ||
|
|
||
| // Convert array to Scalars to sort them easily. Convert back to array at evaluation. | ||
| let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; | ||
|
|
@@ -679,13 +727,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| } | ||
|
|
||
| fn state(&mut self) -> Result<Vec<ScalarValue>> { | ||
| if !self.is_input_pre_ordered { | ||
| self.sort(); | ||
| } | ||
|
|
||
| let mut result = vec![self.evaluate()?]; | ||
| result.push(self.evaluate_orderings()?); | ||
|
|
||
| Ok(result) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> Result<ScalarValue> { | ||
| if !self.is_input_pre_ordered { | ||
| self.sort(); | ||
| } | ||
|
|
||
| if self.values.is_empty() { | ||
| return Ok(ScalarValue::new_null_list( | ||
| self.datatypes[0].clone(), | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically speaking this is an FFI API change -- I am not sure what implication that has (note this would not be released until DataFusion 50 anyways).
cc @timsaucer -- I wonder if we should gather up the FFI breaking changes into their own PR / more carefully schedule such breakages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new thing doesn't need to be supported in the FFI.
However, i didn't know how to avoid adding this.
When looking at
impl From<AggregateOrderSensitivity> for FFI_AggregateOrderSensitivityi am under impression that this particular part of FFI API is tightly coupled with the datafusion core, so in this particular place it cannot deliver API stability without inhibiting datafusion core progress. The necessary solution might be replacing thisFromwithTryFrom, same with (impl From<FFI_AggregateOrderSensitivity> for AggregateOrderSensitivity).My understanding is that, by tightly coupling
AggregateOrderSensitivityandFFI_AggregateOrderSensitivitycode author chose to let these enums naturally evolve over time, considering this not a breaking change, or an acceptable breaking change.