Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn get_record_batch_at_indices(
record_batch: &RecordBatch,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<RecordBatch> {
let new_columns = get_arrayref_at_indices(record_batch.columns(), indices)?;
let new_columns = take_arrays(record_batch.columns(), indices)?;
RecordBatch::try_new_with_options(
record_batch.schema(),
new_columns,
Expand Down Expand Up @@ -291,10 +291,7 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
}

/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
pub fn get_arrayref_at_indices(
arrays: &[ArrayRef],
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Vec<ArrayRef>> {
pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result<Vec<ArrayRef>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering how it differs from arrow::compute::take, looks like the same but for 2dim arrays instead of a single array.

What I'm thinking should we move this method to arrow-rs kernel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, this util is just to abstract away same outer iteration for the compute::take. If community thinks, this is beneficial, we can move this to arrow-rs. I think, this pattern is common enough to move it to arrow-rs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to move to arrow. And maybe add a not int he docs for RecordBatch pointing people at it

It is common enough that it would be nice not to have to write it in each downstream crate

arrays
.iter()
.map(|array| {
Expand Down Expand Up @@ -1023,8 +1020,9 @@ mod tests {
vec![2, 4],
];
for row_indices in row_indices_vec {
let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned());
let chunk = get_arrayref_at_indices(&arrays, &indices)?;
let indices: PrimitiveArray<UInt32Type> =
PrimitiveArray::from_iter_values(row_indices.iter().cloned());
let chunk = take_arrays(&arrays, &indices)?;
for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
for (idx, orig_idx) in row_indices.iter().enumerate() {
let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use arrow::{
datatypes::UInt32Type,
};
use datafusion_common::{
arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result,
ScalarValue,
arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue,
};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
Expand Down Expand Up @@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter {
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
// accumulator once per group with values
let values = get_arrayref_at_indices(values, &batch_indices)?;
let values = take_arrays(values, &batch_indices)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;

// invoke each accumulator with the appropriate rows, first
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl Accumulator for FirstValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
Expand Down Expand Up @@ -613,7 +613,7 @@ impl Accumulator for LastValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices)?
};

if !ordered_states[0].is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl PartialSortExec {
input: Arc<dyn ExecutionPlan>,
common_prefix_length: usize,
) -> Self {
assert!(common_prefix_length > 0);
debug_assert!(common_prefix_length > 0);
let preserve_partitioning = false;
let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning);
Self {
Expand Down Expand Up @@ -289,7 +289,7 @@ impl ExecutionPlan for PartialSortExec {

// Make sure common prefix length is larger than 0
// Otherwise, we should use SortExec.
assert!(self.common_prefix_length > 0);
debug_assert!(self.common_prefix_length > 0);

Ok(Box::pin(PartialSortStream {
input,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::utils::take_arrays;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -617,7 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};

let columns = get_arrayref_at_indices(batch.columns(), &indices)?;
let columns = take_arrays(batch.columns(), &indices)?;

let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use arrow::{
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::stats::Precision;
use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices,
get_row_at_idx, take_arrays,
};
use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -542,7 +542,7 @@ impl PartitionSearcher for LinearSearch {
// We should emit columns according to row index ordering.
let sorted_indices = sort_to_indices(&all_indices, None, None)?;
// Construct new column according to row ordering. This fixes ordering
get_arrayref_at_indices(&new_columns, &sorted_indices).map(Some)
take_arrays(&new_columns, &sorted_indices).map(Some)
}

fn evaluate_partition_batches(
Expand Down