-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Change ScalarValue::Struct to ArrayRef #7893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a6ebe74
2c2503c
177e72b
9efa792
b18eb25
608bdd7
4088750
8e8233f
fcce7f6
7e7e8bb
6b6f6e2
1a8a29c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,10 @@ use crate::{ | |
| use arrow::array::{Array, ArrayRef}; | ||
| use arrow::datatypes::{DataType, Field}; | ||
| use arrow_array::cast::AsArray; | ||
| use arrow_array::{new_empty_array, StructArray}; | ||
| use arrow_schema::{Fields, SortOptions}; | ||
|
|
||
| use datafusion_common::utils::array_into_list_array; | ||
| use datafusion_common::utils::{compare_rows, get_row_at_idx}; | ||
| use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; | ||
| use datafusion_expr::Accumulator; | ||
|
|
@@ -219,6 +222,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // First entry in the state is the aggregation result. Second entry | ||
| // stores values received for ordering requirement columns for each | ||
| // aggregation value inside `ARRAY_AGG` list. For each `StructArray` | ||
|
|
@@ -241,41 +245,49 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| partition_values.push(self.values.clone().into()); | ||
| partition_ordering_values.push(self.ordering_values.clone().into()); | ||
|
|
||
| // Convert array to Scalars to sort them easily. Convert back to array at evaluation. | ||
| let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; | ||
|
|
||
| for v in array_agg_res.into_iter() { | ||
| partition_values.push(v.into()); | ||
| } | ||
|
|
||
| let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; | ||
|
|
||
| let ordering_values = orderings.into_iter().map(|partition_ordering_rows| { | ||
| for partition_ordering_rows in orderings.into_iter() { | ||
| // Extract value from struct to ordering_rows for each group/partition | ||
| partition_ordering_rows.into_iter().map(|ordering_row| { | ||
| if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row { | ||
| Ok(ordering_columns_per_row) | ||
| } else { | ||
| exec_err!( | ||
| "Expects to receive ScalarValue::Struct(Some(..), _) but got: {:?}", | ||
| ordering_row.data_type() | ||
| ) | ||
| } | ||
| }).collect::<Result<VecDeque<_>>>() | ||
| }).collect::<Result<Vec<_>>>()?; | ||
| for ordering_values in ordering_values.into_iter() { | ||
| partition_ordering_values.push(ordering_values); | ||
| let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { | ||
| if let ScalarValue::Struct(s) = ordering_row { | ||
| let mut ordering_columns_per_row = vec![]; | ||
|
|
||
| for column in s.columns() { | ||
| let sv = ScalarValue::try_from_array(column, 0)?; | ||
| ordering_columns_per_row.push(sv); | ||
| } | ||
|
|
||
| Ok(ordering_columns_per_row) | ||
| } else { | ||
| exec_err!( | ||
| "Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}", | ||
| ordering_row.data_type() | ||
| ) | ||
| } | ||
| }).collect::<Result<VecDeque<_>>>()?; | ||
|
|
||
| partition_ordering_values.push(ordering_value); | ||
| } | ||
|
|
||
| let sort_options = self | ||
| .ordering_req | ||
| .iter() | ||
| .map(|sort_expr| sort_expr.options) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| (self.values, self.ordering_values) = merge_ordered_arrays( | ||
| &mut partition_values, | ||
| &mut partition_ordering_values, | ||
| &sort_options, | ||
| )?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -323,20 +335,32 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { | |
| impl OrderSensitiveArrayAggAccumulator { | ||
| fn evaluate_orderings(&self) -> Result<ScalarValue> { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mustafasrepo Have you considered changing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can file a follow on ticket to track this idea |
||
| let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); | ||
| let struct_field = Fields::from(fields); | ||
|
|
||
| let orderings: Vec<ScalarValue> = self | ||
| .ordering_values | ||
| .iter() | ||
| .map(|ordering| { | ||
| ScalarValue::Struct(Some(ordering.clone()), struct_field.clone()) | ||
| }) | ||
| .collect(); | ||
| let struct_type = DataType::Struct(struct_field); | ||
| let num_columns = fields.len(); | ||
| let struct_field = Fields::from(fields.clone()); | ||
|
|
||
| let mut column_wise_ordering_values = vec![]; | ||
| for i in 0..num_columns { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there might be a better design for StructArray (previous design is based on old ScalarValue::Struct). I avoid changing the logic or data structure in this PR. May benefit #8558?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| let column_values = self | ||
| .ordering_values | ||
| .iter() | ||
| .map(|x| x[i].clone()) | ||
| .collect::<Vec<_>>(); | ||
| let array = if column_values.is_empty() { | ||
| new_empty_array(fields[i].data_type()) | ||
| } else { | ||
| ScalarValue::iter_to_array(column_values.into_iter())? | ||
| }; | ||
| column_wise_ordering_values.push(array); | ||
| } | ||
|
|
||
| // Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases | ||
| let arr = ScalarValue::new_list(&orderings, &struct_type); | ||
| Ok(ScalarValue::List(arr)) | ||
| let ordering_array = StructArray::try_new( | ||
| struct_field.clone(), | ||
| column_wise_ordering_values, | ||
| None, | ||
| )?; | ||
| Ok(ScalarValue::List(Arc::new(array_into_list_array( | ||
| Arc::new(ordering_array), | ||
| )))) | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.