Skip to content

Commit add5c9e

Browse files
committed
Return error on non-unique reference
1 parent f92137f commit add5c9e

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

datafusion/physical-plan/src/sorts/stream.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow::array::Array;
2222
use arrow::datatypes::Schema;
2323
use arrow::record_batch::RecordBatch;
2424
use arrow::row::{RowConverter, Rows, SortField};
25-
use datafusion_common::Result;
25+
use datafusion_common::{DataFusionError, Result};
2626
use datafusion_execution::memory_pool::MemoryReservation;
2727
use datafusion_physical_expr_common::sort_expr::LexOrdering;
2828
use futures::stream::{Fuse, StreamExt};
@@ -78,8 +78,7 @@ impl FusedStreams {
7878

7979
/// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`]
8080
/// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`]
81-
/// Note: for optimal performance, keep only the final RowValues in memory
82-
/// before pulling the next batch. This will allow reuse of allocations.
81+
/// Note: this errors
8382
#[derive(Debug)]
8483
pub struct RowCursorStream {
8584
/// Converter to convert output of physical expressions
@@ -142,7 +141,11 @@ impl RowCursorStream {
142141

143142
// At this point, ownership should of this Rows should be unique
144143
let mut rows = Arc::try_unwrap(self.rows[stream_idx][1].take().unwrap())
145-
.unwrap_or_else(|_| self.converter.empty_rows(0, 0));
144+
.map_err(|_| {
145+
DataFusionError::Internal(
146+
"Rows from RowCursorStream is still in use by consumer".to_string(),
147+
)
148+
})?;
146149

147150
rows.clear();
148151

0 commit comments

Comments
 (0)