Skip to content

Commit 7a34147

Browse files
authored
replace take_array with arrow util (#13013)
1 parent 12568bf commit 7a34147

6 files changed

Lines changed: 19 additions & 71 deletions

File tree

datafusion/common/src/utils/mod.rs

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ use crate::error::{_internal_datafusion_err, _internal_err};
2626
use crate::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
2727
use arrow::array::{ArrayRef, PrimitiveArray};
2828
use arrow::buffer::OffsetBuffer;
29-
use arrow::compute;
30-
use arrow::compute::{partition, SortColumn, SortOptions};
29+
use arrow::compute::{partition, take_arrays, SortColumn, SortOptions};
3130
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
3231
use arrow::record_batch::RecordBatch;
3332
use arrow_array::cast::AsArray;
@@ -98,7 +97,7 @@ pub fn get_record_batch_at_indices(
9897
record_batch: &RecordBatch,
9998
indices: &PrimitiveArray<UInt32Type>,
10099
) -> Result<RecordBatch> {
101-
let new_columns = take_arrays(record_batch.columns(), indices)?;
100+
let new_columns = take_arrays(record_batch.columns(), indices, None)?;
102101
RecordBatch::try_new_with_options(
103102
record_batch.schema(),
104103
new_columns,
@@ -290,24 +289,6 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
290289
Ok(idents)
291290
}
292291

293-
/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
294-
///
295-
/// TODO: use implementation in arrow-rs when available:
296-
/// <https://github.com/apache/arrow-rs/pull/6475>
297-
pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result<Vec<ArrayRef>> {
298-
arrays
299-
.iter()
300-
.map(|array| {
301-
compute::take(
302-
array.as_ref(),
303-
indices,
304-
None, // None: no index check
305-
)
306-
.map_err(|e| arrow_datafusion_err!(e))
307-
})
308-
.collect()
309-
}
310-
311292
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
312293
parse_identifiers(s)
313294
.unwrap_or_default()
@@ -1003,40 +984,6 @@ mod tests {
1003984
Ok(())
1004985
}
1005986

1006-
#[test]
1007-
fn test_take_arrays() -> Result<()> {
1008-
let arrays: Vec<ArrayRef> = vec![
1009-
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
1010-
Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
1011-
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
1012-
Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
1013-
];
1014-
1015-
let row_indices_vec: Vec<Vec<u32>> = vec![
1016-
// Get rows 0 and 1
1017-
vec![0, 1],
1018-
// Get rows 0 and 1
1019-
vec![0, 2],
1020-
// Get rows 1 and 3
1021-
vec![1, 3],
1022-
// Get rows 2 and 4
1023-
vec![2, 4],
1024-
];
1025-
for row_indices in row_indices_vec {
1026-
let indices: PrimitiveArray<UInt32Type> =
1027-
PrimitiveArray::from_iter_values(row_indices.iter().cloned());
1028-
let chunk = take_arrays(&arrays, &indices)?;
1029-
for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
1030-
for (idx, orig_idx) in row_indices.iter().enumerate() {
1031-
let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
1032-
let res2 = ScalarValue::try_from_array(arr_chunk, idx)?;
1033-
assert_eq!(res1, res2);
1034-
}
1035-
}
1036-
}
1037-
Ok(())
1038-
}
1039-
1040987
#[test]
1041988
fn test_get_at_indices() -> Result<()> {
1042989
let in_vec = vec![1, 2, 3, 4, 5, 6, 7];

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ use arrow::array::new_empty_array;
2727
use arrow::{
2828
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
2929
compute,
30+
compute::take_arrays,
3031
datatypes::UInt32Type,
3132
};
32-
use datafusion_common::{
33-
arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue,
34-
};
33+
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
3534
use datafusion_expr_common::accumulator::Accumulator;
3635
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
3736

@@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter {
239238
// reorder the values and opt_filter by batch_indices so that
240239
// all values for each group are contiguous, then invoke the
241240
// accumulator once per group with values
242-
let values = take_arrays(values, &batch_indices)?;
241+
let values = take_arrays(values, &batch_indices, None)?;
243242
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
244243

245244
// invoke each accumulator with the appropriate rows, first

datafusion/functions-aggregate/src/first_last.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ use std::fmt::Debug;
2222
use std::sync::{Arc, OnceLock};
2323

2424
use arrow::array::{ArrayRef, AsArray, BooleanArray};
25-
use arrow::compute::{self, lexsort_to_indices, SortColumn};
25+
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
2626
use arrow::datatypes::{DataType, Field};
27-
use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays};
27+
use datafusion_common::utils::{compare_rows, get_row_at_idx};
2828
use datafusion_common::{
2929
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
3030
};
@@ -340,7 +340,7 @@ impl Accumulator for FirstValueAccumulator {
340340
filtered_states
341341
} else {
342342
let indices = lexsort_to_indices(&sort_cols, None)?;
343-
take_arrays(&filtered_states, &indices)?
343+
take_arrays(&filtered_states, &indices, None)?
344344
};
345345
if !ordered_states[0].is_empty() {
346346
let first_row = get_row_at_idx(&ordered_states, 0)?;
@@ -670,7 +670,7 @@ impl Accumulator for LastValueAccumulator {
670670
filtered_states
671671
} else {
672672
let indices = lexsort_to_indices(&sort_cols, None)?;
673-
take_arrays(&filtered_states, &indices)?
673+
take_arrays(&filtered_states, &indices, None)?
674674
};
675675

676676
if !ordered_states[0].is_empty() {

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
3838
use crate::stream::RecordBatchStreamAdapter;
3939
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};
4040

41+
use arrow::compute::take_arrays;
4142
use arrow::datatypes::{SchemaRef, UInt32Type};
4243
use arrow::record_batch::RecordBatch;
4344
use arrow_array::{PrimitiveArray, RecordBatchOptions};
44-
use datafusion_common::utils::{take_arrays, transpose};
45+
use datafusion_common::utils::transpose;
4546
use datafusion_common::{not_impl_err, DataFusionError, Result};
4647
use datafusion_common_runtime::SpawnedTask;
4748
use datafusion_execution::memory_pool::MemoryConsumer;
@@ -300,7 +301,7 @@ impl BatchPartitioner {
300301
let _timer = partitioner_timer.timer();
301302

302303
// Produce batches based on indices
303-
let columns = take_arrays(batch.columns(), &indices)?;
304+
let columns = take_arrays(batch.columns(), &indices, None)?;
304305

305306
let mut options = RecordBatchOptions::new();
306307
options = options.with_row_count(Some(indices.len()));

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,12 @@ use crate::{
4040
SendableRecordBatchStream, Statistics,
4141
};
4242

43-
use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn};
43+
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
4444
use arrow::datatypes::SchemaRef;
4545
use arrow::record_batch::RecordBatch;
4646
use arrow::row::{RowConverter, SortField};
4747
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
4848
use arrow_schema::DataType;
49-
use datafusion_common::utils::take_arrays;
5049
use datafusion_common::{internal_err, Result};
5150
use datafusion_execution::disk_manager::RefCountedTempFile;
5251
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -618,7 +617,7 @@ pub fn sort_batch(
618617
lexsort_to_indices(&sort_columns, fetch)?
619618
};
620619

621-
let columns = take_arrays(batch.columns(), &indices)?;
620+
let columns = take_arrays(batch.columns(), &indices, None)?;
622621

623622
let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
624623
Ok(RecordBatch::try_new_with_options(

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ use crate::{
4242
use ahash::RandomState;
4343
use arrow::{
4444
array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
45-
compute::{concat, concat_batches, sort_to_indices},
45+
compute::{concat, concat_batches, sort_to_indices, take_arrays},
4646
datatypes::SchemaRef,
4747
record_batch::RecordBatch,
4848
};
4949
use datafusion_common::hash_utils::create_hashes;
5050
use datafusion_common::stats::Precision;
5151
use datafusion_common::utils::{
5252
evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices,
53-
get_row_at_idx, take_arrays,
53+
get_row_at_idx,
5454
};
5555
use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result};
5656
use datafusion_execution::TaskContext;
@@ -536,7 +536,9 @@ impl PartitionSearcher for LinearSearch {
536536
// We should emit columns according to row index ordering.
537537
let sorted_indices = sort_to_indices(&all_indices, None, None)?;
538538
// Construct new column according to row ordering. This fixes ordering
539-
take_arrays(&new_columns, &sorted_indices).map(Some)
539+
take_arrays(&new_columns, &sorted_indices, None)
540+
.map(Some)
541+
.map_err(|e| arrow_datafusion_err!(e))
540542
}
541543

542544
fn evaluate_partition_batches(

0 commit comments

Comments
 (0)