diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index fb03ceb15c378..64434e7a4a04a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -40,12 +40,13 @@ use crate::{ SendableRecordBatchStream, Statistics, }; -use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn}; +use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, RecordBatchOptions, UInt32Array}; use arrow_schema::DataType; +use datafusion_common::utils::get_arrayref_at_indices; use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -350,12 +351,8 @@ impl ExternalSorter { self.fetch, self.reservation.new_empty(), ) - } else if !self.in_mem_batches.is_empty() { - self.in_mem_sort_stream(self.metrics.baseline.clone()) } else { - Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, - )))) + self.in_mem_sort_stream(self.metrics.baseline.clone()) } } @@ -500,7 +497,11 @@ impl ExternalSorter { &mut self, metrics: BaselineMetrics, ) -> Result { - assert_ne!(self.in_mem_batches.len(), 0); + if self.in_mem_batches.is_empty() { + return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))); + } // The elapsed compute timer is updated when the value is dropped. // There is no need for an explicit call to drop. @@ -508,7 +509,7 @@ impl ExternalSorter { let _timer = elapsed_compute.timer(); if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.remove(0); + let batch = self.in_mem_batches.swap_remove(0); let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -616,11 +617,7 @@ pub fn sort_batch( lexsort_to_indices(&sort_columns, fetch)? }; - let columns = batch - .columns() - .iter() - .map(|c| take(c.as_ref(), &indices, None)) - .collect::>()?; + let columns = get_arrayref_at_indices(batch.columns(), &indices)?; let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); Ok(RecordBatch::try_new_with_options(