From 93fc72b861ca9ff6dfd61ce4d9355c1ac830c3d8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 10 Nov 2025 14:26:51 -0500 Subject: [PATCH] Rework RowSelectionCursor to use enums --- parquet/src/arrow/arrow_reader/mod.rs | 131 ++++++------ parquet/src/arrow/arrow_reader/read_plan.rs | 51 +++-- parquet/src/arrow/arrow_reader/selection.rs | 216 ++++++++++---------- 3 files changed, 206 insertions(+), 192 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index bae44f9acacd..af5ee7db4cca 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1072,83 +1072,80 @@ impl ParquetRecordBatchReader { fn next_inner(&mut self) -> Result> { let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection_cursor) => { - if selection_cursor.is_mask_backed() { - // Stream the record batch reader using contiguous segments of the selection - // mask, avoiding the need to materialize intermediate `RowSelector` ranges. - while !selection_cursor.is_empty() { - let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else { - return Ok(None); - }; - - if mask_chunk.initial_skip > 0 { - let skipped = - self.array_reader.skip_records(mask_chunk.initial_skip)?; - if skipped != mask_chunk.initial_skip { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - mask_chunk.initial_skip, - skipped - )); - } - } - - if mask_chunk.chunk_rows == 0 { - if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 { - return Ok(None); - } - continue; - } - - let mask = selection_cursor - .mask_values_for(&mask_chunk) - .ok_or_else(|| general_err!("row selection mask out of bounds"))?; + match self.read_plan.row_selection_cursor_mut() { + RowSelectionCursor::Mask(mask_cursor) => { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + while !mask_cursor.is_empty() { + let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else { + return Ok(None); + }; - let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; - if read == 0 { + if mask_chunk.initial_skip > 0 { + let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?; + if skipped != mask_chunk.initial_skip { return Err(general_err!( - "reached end of column while expecting {} rows", - mask_chunk.chunk_rows + "failed to skip rows, expected {}, got {}", + mask_chunk.initial_skip, + skipped )); } - if read != mask_chunk.chunk_rows { - return Err(general_err!( - "insufficient rows read from array reader - expected {}, got {}", - mask_chunk.chunk_rows, - read - )); + } + + if mask_chunk.chunk_rows == 0 { + if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 { + return Ok(None); } + continue; + } - let array = self.array_reader.consume_batch()?; - // The column reader exposes the projection as a struct array; convert this - // into a record batch before applying the boolean filter mask. - let struct_array = array.as_struct_opt().ok_or_else(|| { - ArrowError::ParquetError( - "Struct array reader should return struct array".to_string(), - ) - })?; + let mask = mask_cursor.mask_values_for(&mask_chunk)?; - let filtered_batch = - filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + let read = self.array_reader.read_records(mask_chunk.chunk_rows)?; + if read == 0 { + return Err(general_err!( + "reached end of column while expecting {} rows", + mask_chunk.chunk_rows + )); + } + if read != mask_chunk.chunk_rows { + return Err(general_err!( + "insufficient rows read from array reader - expected {}, got {}", + mask_chunk.chunk_rows, + read + )); + } - if filtered_batch.num_rows() != mask_chunk.selected_rows { - return Err(general_err!( - "filtered rows mismatch selection - expected {}, got {}", - mask_chunk.selected_rows, - filtered_batch.num_rows() - )); - } + let array = self.array_reader.consume_batch()?; + // The column reader exposes the projection as a struct array; convert this + // into a record batch before applying the boolean filter mask. + let struct_array = array.as_struct_opt().ok_or_else(|| { + ArrowError::ParquetError( + "Struct array reader should return struct array".to_string(), + ) + })?; - if filtered_batch.num_rows() == 0 { - continue; - } + let filtered_batch = + filter_record_batch(&RecordBatch::from(struct_array), &mask)?; + + if filtered_batch.num_rows() != mask_chunk.selected_rows { + return Err(general_err!( + "filtered rows mismatch selection - expected {}, got {}", + mask_chunk.selected_rows, + filtered_batch.num_rows() + )); + } - return Ok(Some(filtered_batch)); + if filtered_batch.num_rows() == 0 { + continue; } + + return Ok(Some(filtered_batch)); } - while read_records < batch_size && !selection_cursor.is_empty() { - let front = selection_cursor.next_selector(); + } + RowSelectionCursor::Selectors(selectors_cursor) => { + while read_records < batch_size && !selectors_cursor.is_empty() { + let front = selectors_cursor.next_selector(); if front.skip { let skipped = self.array_reader.skip_records(front.row_count)?; @@ -1174,7 +1171,7 @@ impl ParquetRecordBatchReader { Some(remaining) if remaining != 0 => { // if page row count less than batch_size we must set batch size to page row count. // add check avoid dead loop - selection_cursor.return_selector(RowSelector::select(remaining)); + selectors_cursor.return_selector(RowSelector::select(remaining)); need_read } _ => front.row_count, @@ -1185,7 +1182,7 @@ impl ParquetRecordBatchReader { }; } } - None => { + RowSelectionCursor::All => { self.array_reader.read_records(batch_size)?; } }; diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 5b5191115f80..d0cbe434c07d 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -26,6 +26,7 @@ use crate::arrow::arrow_reader::{ use crate::errors::{ParquetError, Result}; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; +use std::collections::VecDeque; /// A builder for [`ReadPlan`] #[derive(Clone, Debug)] @@ -89,6 +90,8 @@ impl ReadPlanBuilder { } /// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection. + /// + /// Guarantees to return either `Selectors` or `Mask`, never `Auto`. pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy { match self.selection_strategy { RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors, @@ -167,25 +170,35 @@ impl ReadPlanBuilder { if !self.selects_any() { self.selection = Some(RowSelection::from(vec![])); } - let selection_strategy = match self.selection_strategy { - RowSelectionStrategy::Auto { .. } => self.preferred_selection_strategy(), - strategy => strategy, - }; + + // Preferred strategy must not be Auto + let selection_strategy = self.preferred_selection_strategy(); + let Self { batch_size, selection, selection_strategy: _, } = self; - let selection = selection.map(|s| { - let trimmed = s.trim(); - let selectors: Vec = trimmed.into(); - RowSelectionCursor::new(selectors, selection_strategy) - }); + let selection = selection.map(|s| s.trim()); + + let row_selection_cursor = selection + .map(|s| { + let trimmed = s.trim(); + let selectors: Vec = trimmed.into(); + match selection_strategy { + RowSelectionStrategy::Mask => { + RowSelectionCursor::new_mask_from_selectors(selectors) + } + RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors), + RowSelectionStrategy::Auto { .. } => unreachable!(), + } + }) + .unwrap_or(RowSelectionCursor::new_all()); ReadPlan { batch_size, - selection, + row_selection_cursor, } } } @@ -283,13 +296,23 @@ pub struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source - selection: Option, + row_selection_cursor: RowSelectionCursor, } impl ReadPlan { - /// Returns a mutable reference to the selection, if any - pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> { - self.selection.as_mut() + /// Returns a mutable reference to the selection selectors, if any + #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")] + pub fn selection_mut(&mut self) -> Option<&mut VecDeque> { + if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor { + Some(selectors_cursor.selectors_mut()) + } else { + None + } + } + + /// Returns a mutable reference to the row selection cursor + pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor { + &mut self.row_selection_cursor } /// Return the number of rows to read in each output batch diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index ff975e2bc42d..5fcf494454fd 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::ProjectionMask; +use crate::errors::ParquetError; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::SlicesIterator; @@ -22,9 +25,6 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -use crate::arrow::ProjectionMask; -use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; - /// Strategy for materialising [`RowSelection`] during execution. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum RowSelectionStrategy { @@ -751,116 +751,27 @@ fn union_row_selections(left: &[RowSelector], right: &[RowSelector]) -> RowSelec iter.collect() } -/// Cursor for iterating a [`RowSelection`] during execution within a -/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). +/// Cursor for iterating a mask-backed [`RowSelection`] /// -/// This keeps per-reader state such as the current position and delegates the -/// actual storage strategy to the internal `RowSelectionBacking`. +/// This is best for dense selections where there are many small skips +/// or selections. For example, selecting every other row. #[derive(Debug)] -pub struct RowSelectionCursor { - /// Backing storage describing how the selection is materialised - storage: RowSelectionBacking, +pub struct MaskCursor { + mask: BooleanBuffer, /// Current absolute offset into the selection position: usize, } -/// Backing storage that powers [`RowSelectionCursor`]. -/// -/// The cursor either walks a boolean mask (dense representation) or a queue -/// of [`RowSelector`] ranges (sparse representation). -#[derive(Debug)] -pub enum RowSelectionBacking { - Mask(BooleanBuffer), - Selectors(VecDeque), -} - -/// Result of computing the next chunk to read when using a bitmap mask -#[derive(Debug)] -pub struct MaskChunk { - /// Number of leading rows to skip before reaching selected rows - pub initial_skip: usize, - /// Total rows covered by this chunk (selected + skipped) - pub chunk_rows: usize, - /// Rows actually selected within the chunk - pub selected_rows: usize, - /// Starting offset within the mask where the chunk begins - pub mask_start: usize, -} - -impl RowSelectionCursor { - /// Create a cursor, choosing an efficient backing representation - pub(crate) fn new(selectors: Vec, strategy: RowSelectionStrategy) -> Self { - let storage = match strategy { - RowSelectionStrategy::Mask => { - RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) - } - RowSelectionStrategy::Selectors => RowSelectionBacking::Selectors(selectors.into()), - RowSelectionStrategy::Auto { .. } => { - panic!("RowSelectionStrategy::Auto must be resolved before creating cursor") - } - }; - - Self { - storage, - position: 0, - } - } - +impl MaskCursor { /// Returns `true` when no further rows remain pub fn is_empty(&self) -> bool { - match &self.storage { - RowSelectionBacking::Mask(mask) => self.position >= mask.len(), - RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), - } - } - - /// Current position within the overall selection - pub fn position(&self) -> usize { - self.position - } - - /// Return the next [`RowSelector`] when using the sparse representation - pub fn next_selector(&mut self) -> RowSelector { - match &mut self.storage { - RowSelectionBacking::Selectors(selectors) => { - let selector = selectors.pop_front().unwrap(); - self.position += selector.row_count; - selector - } - RowSelectionBacking::Mask(_) => { - unreachable!("next_selector called for mask-based RowSelectionCursor") - } - } - } - - /// Return a selector to the front, rewinding the position (sparse-only) - pub fn return_selector(&mut self, selector: RowSelector) { - match &mut self.storage { - RowSelectionBacking::Selectors(selectors) => { - self.position = self.position.saturating_sub(selector.row_count); - selectors.push_front(selector); - } - RowSelectionBacking::Mask(_) => { - unreachable!("return_selector called for mask-based RowSelectionCursor") - } - } - } - - /// Returns `true` if the cursor is backed by a boolean mask - pub fn is_mask_backed(&self) -> bool { - matches!(self.storage, RowSelectionBacking::Mask(_)) + self.position >= self.mask.len() } /// Advance through the mask representation, producing the next chunk summary pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option { - if !self.is_mask_backed() { - unreachable!("next_mask_chunk called for selector-based RowSelectionCursor") - } let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { - let mask = match &self.storage { - RowSelectionBacking::Mask(mask) => mask, - RowSelectionBacking::Selectors(_) => return None, - }; + let mask = &self.mask; if self.position >= mask.len() { return None; @@ -904,18 +815,101 @@ impl RowSelectionCursor { } /// Materialise the boolean values for a mask-backed chunk - pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option { - match &self.storage { - RowSelectionBacking::Mask(mask) => { - if chunk.mask_start.saturating_add(chunk.chunk_rows) > mask.len() { - return None; - } - Some(BooleanArray::from( - mask.slice(chunk.mask_start, chunk.chunk_rows), - )) - } - RowSelectionBacking::Selectors(_) => None, + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len() { + return Err(ParquetError::General( + "Internal Error: MaskChunk exceeds mask length".to_string(), + )); } + Ok(BooleanArray::from( + self.mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } +} + +/// Cursor for iterating a selector-backed [`RowSelection`] +/// +/// This is best for sparse selections where large contiguous +/// blocks of rows are selected or skipped. +#[derive(Debug)] +pub struct SelectorsCursor { + selectors: VecDeque, + /// Current absolute offset into the selection + position: usize, +} + +impl SelectorsCursor { + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + self.selectors.is_empty() + } + + pub(crate) fn selectors_mut(&mut self) -> &mut VecDeque { + &mut self.selectors + } + + /// Return the next [`RowSelector`] + pub(crate) fn next_selector(&mut self) -> RowSelector { + let selector = self.selectors.pop_front().unwrap(); + self.position += selector.row_count; + selector + } + + /// Return a selector to the front, rewinding the position + pub(crate) fn return_selector(&mut self, selector: RowSelector) { + self.position = self.position.saturating_sub(selector.row_count); + self.selectors.push_front(selector); + } +} + +/// Result of computing the next chunk to read when using a [`MaskCursor`] +#[derive(Debug)] +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +/// Cursor for iterating a [`RowSelection`] during execution within a +/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to the internal `RowSelectionBacking`. +#[derive(Debug)] +pub enum RowSelectionCursor { + /// Reading all rows + All, + /// Use a bitmask to back the selection (dense selections) + Mask(MaskCursor), + /// Use a queue of selectors to back the selection (sparse selections) + Selectors(SelectorsCursor), +} + +impl RowSelectionCursor { + /// Create a [`MaskCursor`] cursor backed by a bitmask, from an existing set of selectors + pub(crate) fn new_mask_from_selectors(selectors: Vec) -> Self { + Self::Mask(MaskCursor { + mask: boolean_mask_from_selectors(&selectors), + position: 0, + }) + } + + /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors + pub(crate) fn new_selectors(selectors: Vec) -> Self { + Self::Selectors(SelectorsCursor { + selectors: selectors.into(), + position: 0, + }) + } + + /// Create a cursor that selects all rows + pub(crate) fn new_all() -> Self { + Self::All } }