diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 5bf0f08b092a4..def1def9853c8 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -26,8 +26,7 @@ use crate::error::{_internal_datafusion_err, _internal_err}; use crate::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; use arrow::array::{ArrayRef, PrimitiveArray}; use arrow::buffer::OffsetBuffer; -use arrow::compute; -use arrow::compute::{partition, SortColumn, SortOptions}; +use arrow::compute::{partition, take_arrays, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; use arrow_array::cast::AsArray; @@ -98,7 +97,7 @@ pub fn get_record_batch_at_indices( record_batch: &RecordBatch, indices: &PrimitiveArray, ) -> Result { - let new_columns = take_arrays(record_batch.columns(), indices)?; + let new_columns = take_arrays(record_batch.columns(), indices, None)?; RecordBatch::try_new_with_options( record_batch.schema(), new_columns, @@ -290,24 +289,6 @@ pub(crate) fn parse_identifiers(s: &str) -> Result> { Ok(idents) } -/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`. -/// -/// TODO: use implementation in arrow-rs when available: -/// -pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result> { - arrays - .iter() - .map(|array| { - compute::take( - array.as_ref(), - indices, - None, // None: no index check - ) - .map_err(|e| arrow_datafusion_err!(e)) - }) - .collect() -} - pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec { parse_identifiers(s) .unwrap_or_default() @@ -1003,40 +984,6 @@ mod tests { Ok(()) } - #[test] - fn test_take_arrays() -> Result<()> { - let arrays: Vec = vec![ - Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])), - Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])), - Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])), - Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])), - ]; - - let row_indices_vec: Vec> = vec![ - // Get rows 0 and 1 - vec![0, 1], - // Get rows 0 and 1 - vec![0, 2], - // Get rows 1 and 3 - vec![1, 3], - // Get rows 2 and 4 - vec![2, 4], - ]; - for row_indices in row_indices_vec { - let indices: PrimitiveArray = - PrimitiveArray::from_iter_values(row_indices.iter().cloned()); - let chunk = take_arrays(&arrays, &indices)?; - for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) { - for (idx, orig_idx) in row_indices.iter().enumerate() { - let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?; - let res2 = ScalarValue::try_from_array(arr_chunk, idx)?; - assert_eq!(res1, res2); - } - } - } - Ok(()) - } - #[test] fn test_get_at_indices() -> Result<()> { let in_vec = vec![1, 2, 3, 4, 5, 6, 7]; diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index b03df0224089c..c936c80cbed71 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -27,11 +27,10 @@ use arrow::array::new_empty_array; use arrow::{ array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, compute, + compute::take_arrays, datatypes::UInt32Type, }; -use datafusion_common::{ - arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue, -}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter { // reorder the values and opt_filter by batch_indices so that // all values for each group are contiguous, then invoke the // accumulator once per group with values - let values = take_arrays(values, &batch_indices)?; + let values = take_arrays(values, &batch_indices, None)?; let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; // invoke each accumulator with the appropriate rows, first diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index f6a84c84dcb0b..2a3fc623657a3 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -22,9 +22,9 @@ use std::fmt::Debug; use std::sync::{Arc, OnceLock}; use arrow::array::{ArrayRef, AsArray, BooleanArray}; -use arrow::compute::{self, lexsort_to_indices, SortColumn}; +use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::{DataType, Field}; -use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays}; +use datafusion_common::utils::{compare_rows, get_row_at_idx}; use datafusion_common::{ arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, }; @@ -340,7 +340,7 @@ impl Accumulator for FirstValueAccumulator { filtered_states } else { let indices = lexsort_to_indices(&sort_cols, None)?; - take_arrays(&filtered_states, &indices)? + take_arrays(&filtered_states, &indices, None)? }; if !ordered_states[0].is_empty() { let first_row = get_row_at_idx(&ordered_states, 0)?; @@ -670,7 +670,7 @@ impl Accumulator for LastValueAccumulator { filtered_states } else { let indices = lexsort_to_indices(&sort_cols, None)?; - take_arrays(&filtered_states, &indices)? + take_arrays(&filtered_states, &indices, None)? }; if !ordered_states[0].is_empty() { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 902d9f4477bc8..90e62d6f11f82 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -38,10 +38,11 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; +use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; use arrow_array::{PrimitiveArray, RecordBatchOptions}; -use datafusion_common::utils::{take_arrays, transpose}; +use datafusion_common::utils::transpose; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; @@ -300,7 +301,7 @@ impl BatchPartitioner { let _timer = partitioner_timer.timer(); // Produce batches based on indices - let columns = take_arrays(batch.columns(), &indices)?; + let columns = take_arrays(batch.columns(), &indices, None)?; let mut options = RecordBatchOptions::new(); options = options.with_row_count(Some(indices.len())); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 5d86c2183b9e6..8e13a2e07e490 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -40,13 +40,12 @@ use crate::{ SendableRecordBatchStream, Statistics, }; -use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn}; +use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, RecordBatchOptions, UInt32Array}; use arrow_schema::DataType; -use datafusion_common::utils::take_arrays; use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -618,7 +617,7 @@ pub fn sort_batch( lexsort_to_indices(&sort_columns, fetch)? }; - let columns = take_arrays(batch.columns(), &indices)?; + let columns = take_arrays(batch.columns(), &indices, None)?; let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); Ok(RecordBatch::try_new_with_options( diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 4a4c940b22e2f..6254ae139a005 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -42,7 +42,7 @@ use crate::{ use ahash::RandomState; use arrow::{ array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder}, - compute::{concat, concat_batches, sort_to_indices}, + compute::{concat, concat_batches, sort_to_indices, take_arrays}, datatypes::SchemaRef, record_batch::RecordBatch, }; @@ -50,7 +50,7 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices, - get_row_at_idx, take_arrays, + get_row_at_idx, }; use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -536,7 +536,9 @@ impl PartitionSearcher for LinearSearch { // We should emit columns according to row index ordering. let sorted_indices = sort_to_indices(&all_indices, None, None)?; // Construct new column according to row ordering. This fixes ordering - take_arrays(&new_columns, &sorted_indices).map(Some) + take_arrays(&new_columns, &sorted_indices, None) + .map(Some) + .map_err(|e| arrow_datafusion_err!(e)) } fn evaluate_partition_batches(