diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index c7032e601cf86..4afa8d0dd5eca 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -36,13 +36,14 @@ use datafusion_common::{ use datafusion_expr::Accumulator; /// FIRST_VALUE aggregate expression -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FirstValue { name: String, input_data_type: DataType, order_by_data_types: Vec, expr: Arc, ordering_req: LexOrdering, + requirement_satisfied: bool, } impl FirstValue { @@ -54,12 +55,14 @@ impl FirstValue { ordering_req: LexOrdering, order_by_data_types: Vec, ) -> Self { + let requirement_satisfied = ordering_req.is_empty(); Self { name: name.into(), input_data_type, order_by_data_types, expr, ordering_req, + requirement_satisfied, } } @@ -87,6 +90,33 @@ impl FirstValue { pub fn ordering_req(&self) -> &LexOrdering { &self.ordering_req } + + pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { + self.requirement_satisfied = requirement_satisfied; + self + } + + pub fn convert_to_last(self) -> LastValue { + let name = if self.name.starts_with("FIRST") { + format!("LAST{}", &self.name[5..]) + } else { + format!("LAST_VALUE({})", self.expr) + }; + let FirstValue { + expr, + input_data_type, + ordering_req, + order_by_data_types, + .. + } = self; + LastValue::new( + expr, + name, + input_data_type, + reverse_order_bys(&ordering_req), + order_by_data_types, + ) + } } impl AggregateExpr for FirstValue { @@ -100,11 +130,14 @@ impl AggregateExpr for FirstValue { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(FirstValueAccumulator::try_new( + FirstValueAccumulator::try_new( &self.input_data_type, &self.order_by_data_types, self.ordering_req.clone(), - )?)) + ) + .map(|acc| { + Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ + }) } fn state_fields(&self) -> Result> { @@ -130,11 +163,7 @@ impl AggregateExpr for FirstValue { } fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - if self.ordering_req.is_empty() { - None - } else { - Some(&self.ordering_req) - } + (!self.ordering_req.is_empty()).then_some(&self.ordering_req) } fn name(&self) -> &str { @@ -142,26 +171,18 @@ impl AggregateExpr for FirstValue { } fn reverse_expr(&self) -> Option> { - let name = if self.name.starts_with("FIRST") { - format!("LAST{}", &self.name[5..]) - } else { - format!("LAST_VALUE({})", self.expr) - }; - Some(Arc::new(LastValue::new( - self.expr.clone(), - name, - self.input_data_type.clone(), - reverse_order_bys(&self.ordering_req), - self.order_by_data_types.clone(), - ))) + Some(Arc::new(self.clone().convert_to_last())) } fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::new(FirstValueAccumulator::try_new( + FirstValueAccumulator::try_new( &self.input_data_type, &self.order_by_data_types, self.ordering_req.clone(), - )?)) + ) + .map(|acc| { + Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ + }) } } @@ -190,6 +211,8 @@ struct FirstValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Stores whether incoming data already satisfies the ordering requirement. + requirement_satisfied: bool, } impl FirstValueAccumulator { @@ -203,42 +226,29 @@ impl FirstValueAccumulator { .iter() .map(ScalarValue::try_from) .collect::>>()?; - ScalarValue::try_from(data_type).map(|value| Self { - first: value, + let requirement_satisfied = ordering_req.is_empty(); + ScalarValue::try_from(data_type).map(|first| Self { + first, is_set: false, orderings, ordering_req, + requirement_satisfied, }) } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { - let [value, orderings @ ..] = row else { - return internal_err!("Empty row in FIRST_VALUE"); - }; - // Update when there is no entry in the state, or we have an "earlier" - // entry according to sort requirements. - if !self.is_set - || compare_rows( - &self.orderings, - orderings, - &get_sort_options(&self.ordering_req), - )? - .is_gt() - { - self.first = value.clone(); - self.orderings = orderings.to_vec(); - self.is_set = true; - } - Ok(()) + fn update_with_new_row(&mut self, row: &[ScalarValue]) { + self.first = row[0].clone(); + self.orderings = row[1..].to_vec(); + self.is_set = true; } fn get_first_idx(&self, values: &[ArrayRef]) -> Result> { let [value, ordering_values @ ..] = values else { return internal_err!("Empty row in FIRST_VALUE"); }; - if self.ordering_req.is_empty() { - // Get first entry according to receive order (0th index) + if self.requirement_satisfied { + // Get first entry according to the pre-existing ordering (0th index): return Ok((!value.is_empty()).then_some(0)); } let sort_columns = ordering_values @@ -252,6 +262,11 @@ impl FirstValueAccumulator { let indices = lexsort_to_indices(&sort_columns, Some(1))?; Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } + + fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { + self.requirement_satisfied = requirement_satisfied; + self + } } impl Accumulator for FirstValueAccumulator { @@ -263,9 +278,25 @@ impl Accumulator for FirstValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(first_idx) = self.get_first_idx(values)? { - let row = get_row_at_idx(values, first_idx)?; - self.update_with_new_row(&row)?; + if !self.is_set { + if let Some(first_idx) = self.get_first_idx(values)? { + let row = get_row_at_idx(values, first_idx)?; + self.update_with_new_row(&row); + } + } else if !self.requirement_satisfied { + if let Some(first_idx) = self.get_first_idx(values)? { + let row = get_row_at_idx(values, first_idx)?; + let orderings = &row[1..]; + if compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_gt() + { + self.update_with_new_row(&row); + } + } } Ok(()) } @@ -294,12 +325,12 @@ impl Accumulator for FirstValueAccumulator { let sort_options = get_sort_options(&self.ordering_req); // Either there is no existing value, or there is an earlier version in new data. if !self.is_set - || compare_rows(first_ordering, &self.orderings, &sort_options)?.is_lt() + || compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt() { // Update with first value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&first_row[0..is_set_idx])?; + self.update_with_new_row(&first_row[0..is_set_idx]); } } Ok(()) @@ -318,13 +349,14 @@ impl Accumulator for FirstValueAccumulator { } /// LAST_VALUE aggregate expression -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct LastValue { name: String, input_data_type: DataType, order_by_data_types: Vec, expr: Arc, ordering_req: LexOrdering, + requirement_satisfied: bool, } impl LastValue { @@ -336,12 +368,14 @@ impl LastValue { ordering_req: LexOrdering, order_by_data_types: Vec, ) -> Self { + let requirement_satisfied = ordering_req.is_empty(); Self { name: name.into(), input_data_type, order_by_data_types, expr, ordering_req, + requirement_satisfied, } } @@ -369,6 +403,33 @@ impl LastValue { pub fn ordering_req(&self) -> &LexOrdering { &self.ordering_req } + + pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { + self.requirement_satisfied = requirement_satisfied; + self + } + + pub fn convert_to_first(self) -> FirstValue { + let name = if self.name.starts_with("LAST") { + format!("FIRST{}", &self.name[4..]) + } else { + format!("FIRST_VALUE({})", self.expr) + }; + let LastValue { + expr, + input_data_type, + ordering_req, + order_by_data_types, + .. + } = self; + FirstValue::new( + expr, + name, + input_data_type, + reverse_order_bys(&ordering_req), + order_by_data_types, + ) + } } impl AggregateExpr for LastValue { @@ -382,11 +443,14 @@ impl AggregateExpr for LastValue { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(LastValueAccumulator::try_new( + LastValueAccumulator::try_new( &self.input_data_type, &self.order_by_data_types, self.ordering_req.clone(), - )?)) + ) + .map(|acc| { + Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ + }) } fn state_fields(&self) -> Result> { @@ -412,11 +476,7 @@ impl AggregateExpr for LastValue { } fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - if self.ordering_req.is_empty() { - None - } else { - Some(&self.ordering_req) - } + (!self.ordering_req.is_empty()).then_some(&self.ordering_req) } fn name(&self) -> &str { @@ -424,26 +484,18 @@ impl AggregateExpr for LastValue { } fn reverse_expr(&self) -> Option> { - let name = if self.name.starts_with("LAST") { - format!("FIRST{}", &self.name[4..]) - } else { - format!("FIRST_VALUE({})", self.expr) - }; - Some(Arc::new(FirstValue::new( - self.expr.clone(), - name, - self.input_data_type.clone(), - reverse_order_bys(&self.ordering_req), - self.order_by_data_types.clone(), - ))) + Some(Arc::new(self.clone().convert_to_first())) } fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::new(LastValueAccumulator::try_new( + LastValueAccumulator::try_new( &self.input_data_type, &self.order_by_data_types, self.ordering_req.clone(), - )?)) + ) + .map(|acc| { + Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ + }) } } @@ -471,6 +523,8 @@ struct LastValueAccumulator { orderings: Vec, // Stores the applicable ordering requirement. ordering_req: LexOrdering, + // Stores whether incoming data already satisfies the ordering requirement. + requirement_satisfied: bool, } impl LastValueAccumulator { @@ -484,42 +538,28 @@ impl LastValueAccumulator { .iter() .map(ScalarValue::try_from) .collect::>>()?; - Ok(Self { - last: ScalarValue::try_from(data_type)?, + let requirement_satisfied = ordering_req.is_empty(); + ScalarValue::try_from(data_type).map(|last| Self { + last, is_set: false, orderings, ordering_req, + requirement_satisfied, }) } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { - let [value, orderings @ ..] = row else { - return internal_err!("Empty row in LAST_VALUE"); - }; - // Update when there is no entry in the state, or we have a "later" - // entry (either according to sort requirements or the order of execution). - if !self.is_set - || self.orderings.is_empty() - || compare_rows( - &self.orderings, - orderings, - &get_sort_options(&self.ordering_req), - )? - .is_lt() - { - self.last = value.clone(); - self.orderings = orderings.to_vec(); - self.is_set = true; - } - Ok(()) + fn update_with_new_row(&mut self, row: &[ScalarValue]) { + self.last = row[0].clone(); + self.orderings = row[1..].to_vec(); + self.is_set = true; } fn get_last_idx(&self, values: &[ArrayRef]) -> Result> { let [value, ordering_values @ ..] = values else { return internal_err!("Empty row in LAST_VALUE"); }; - if self.ordering_req.is_empty() { + if self.requirement_satisfied { // Get last entry according to the order of data: return Ok((!value.is_empty()).then_some(value.len() - 1)); } @@ -538,6 +578,11 @@ impl LastValueAccumulator { let indices = lexsort_to_indices(&sort_columns, Some(1))?; Ok((!indices.is_empty()).then_some(indices.value(0) as _)) } + + fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { + self.requirement_satisfied = requirement_satisfied; + self + } } impl Accumulator for LastValueAccumulator { @@ -549,10 +594,26 @@ impl Accumulator for LastValueAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(last_idx) = self.get_last_idx(values)? { + if !self.is_set || self.requirement_satisfied { + if let Some(last_idx) = self.get_last_idx(values)? { + let row = get_row_at_idx(values, last_idx)?; + self.update_with_new_row(&row); + } + } else if let Some(last_idx) = self.get_last_idx(values)? { let row = get_row_at_idx(values, last_idx)?; - self.update_with_new_row(&row)?; + let orderings = &row[1..]; + // Update when there is a more recent entry + if compare_rows( + &self.orderings, + orderings, + &get_sort_options(&self.ordering_req), + )? + .is_lt() + { + self.update_with_new_row(&row); + } } + Ok(()) } @@ -583,12 +644,12 @@ impl Accumulator for LastValueAccumulator { // Either there is no existing value, or there is a newer (latest) // version in the new data: if !self.is_set - || compare_rows(last_ordering, &self.orderings, &sort_options)?.is_gt() + || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() { // Update with last value in the state. Note that we should exclude the // is_set flag from the state. Otherwise, we will end up with a state // containing two is_set flags. - self.update_with_new_row(&last_row[0..is_set_idx])?; + self.update_with_new_row(&last_row[0..is_set_idx]); } } Ok(()) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index f5bb4fe59b5d7..a38044de02e38 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -44,9 +44,9 @@ use datafusion_expr::Accumulator; use datafusion_physical_expr::{ aggregate::is_order_sensitive, equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, - LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn}, + physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, + LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; use itertools::Itertools; @@ -324,7 +324,7 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + mut aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -347,7 +347,8 @@ impl AggregateExec { .collect::>(); let req = get_aggregate_exprs_requirement( - &aggr_expr, + &new_requirement, + &mut aggr_expr, &group_by, &input_eq_properties, &mode, @@ -896,6 +897,11 @@ fn finer_ordering( eq_properties.get_finer_ordering(existing_req, &aggr_req) } +/// Concatenates the given slices. +fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { + [lhs, rhs].concat() +} + /// Get the common requirement that satisfies all the aggregate expressions. /// /// # Parameters @@ -914,14 +920,64 @@ fn finer_ordering( /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. fn get_aggregate_exprs_requirement( - aggr_exprs: &[Arc], + prefix_requirement: &[PhysicalSortRequirement], + aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, ) -> Result { let mut requirement = vec![]; - for aggr_expr in aggr_exprs.iter() { - if let Some(finer_ordering) = + for aggr_expr in aggr_exprs.iter_mut() { + let aggr_req = aggr_expr.order_bys().unwrap_or(&[]); + let reverse_aggr_req = reverse_order_bys(aggr_req); + let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req); + let reverse_aggr_req = + PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req); + if let Some(first_value) = aggr_expr.as_any().downcast_ref::() { + let mut first_value = first_value.clone(); + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to LAST_VALUE enables more efficient execution + // given the existing ordering: + let mut last_value = first_value.convert_to_last(); + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + first_value = first_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(first_value) as _; + } + } else if let Some(last_value) = aggr_expr.as_any().downcast_ref::() { + let mut last_value = last_value.clone(); + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to FIRST_VALUE enables more efficient execution + // given the existing ordering: + let mut first_value = last_value.convert_to_first(); + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + last_value = last_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(last_value) as _; + } + } else if let Some(finer_ordering) = finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode) { requirement = finer_ordering; @@ -2071,7 +2127,7 @@ mod tests { options: options1, }, ]; - let aggr_exprs = order_by_exprs + let mut aggr_exprs = order_by_exprs .into_iter() .map(|order_by_expr| { Arc::new(OrderSensitiveArrayAgg::new( @@ -2086,7 +2142,8 @@ mod tests { .collect::>(); let group_by = PhysicalGroupBy::new_single(vec![]); let res = get_aggregate_exprs_requirement( - &aggr_exprs, + &[], + &mut aggr_exprs, &group_by, &eq_properties, &AggregateMode::Partial, diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index bbf21e135fe4a..b09ff79e88d50 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2508,7 +2508,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 DESC] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2539,7 +2539,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2571,7 +2571,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ----TableScan: sales_global projection=[country, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] ----SortExec: expr=[amount@1 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] @@ -2636,7 +2636,7 @@ Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sal ------TableScan: sales_global projection=[country, ts, amount] physical_plan ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] ---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] ----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort @@ -2988,7 +2988,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------CoalesceBatchesExec: target_batch_size=4 ----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] --------------SortExec: expr=[amount@1 DESC] ----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3631,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_tab ----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] ---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] +--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] ----CoalesceBatchesExec: target_batch_size=2 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 ---------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] +--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true