Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 64 additions & 67 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,83 +1072,80 @@ impl ParquetRecordBatchReader {
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
let batch_size = self.batch_size();
match self.read_plan.selection_mut() {
Some(selection_cursor) => {
if selection_cursor.is_mask_backed() {
// Stream the record batch reader using contiguous segments of the selection
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
while !selection_cursor.is_empty() {
let Some(mask_chunk) = selection_cursor.next_mask_chunk(batch_size) else {
return Ok(None);
};

if mask_chunk.initial_skip > 0 {
let skipped =
self.array_reader.skip_records(mask_chunk.initial_skip)?;
if skipped != mask_chunk.initial_skip {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
mask_chunk.initial_skip,
skipped
));
}
}

if mask_chunk.chunk_rows == 0 {
if selection_cursor.is_empty() && mask_chunk.selected_rows == 0 {
return Ok(None);
}
continue;
}

let mask = selection_cursor
.mask_values_for(&mask_chunk)
.ok_or_else(|| general_err!("row selection mask out of bounds"))?;
match self.read_plan.row_selection_cursor_mut() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you review this with "whitespace blind" diff it is easier to see what changed

https://github.com/hhhizzz/arrow-rs/pull/8/files?w=1

Basically the three cases are now handled via three enum variants, and the code is a match on row_selection_cursor_mut rather than Some(row_selection) and selection_cursor.is_mask_backed()

RowSelectionCursor::Mask(mask_cursor) => {
// Stream the record batch reader using contiguous segments of the selection
// mask, avoiding the need to materialize intermediate `RowSelector` ranges.
while !mask_cursor.is_empty() {
let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
return Ok(None);
};

let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
if mask_chunk.initial_skip > 0 {
let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
if skipped != mask_chunk.initial_skip {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
"failed to skip rows, expected {}, got {}",
mask_chunk.initial_skip,
skipped
));
}
if read != mask_chunk.chunk_rows {
return Err(general_err!(
"insufficient rows read from array reader - expected {}, got {}",
mask_chunk.chunk_rows,
read
));
}

if mask_chunk.chunk_rows == 0 {
if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
return Ok(None);
}
continue;
}

let array = self.array_reader.consume_batch()?;
// The column reader exposes the projection as a struct array; convert this
// into a record batch before applying the boolean filter mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
})?;
let mask = mask_cursor.mask_values_for(&mask_chunk)?;

let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
if read != mask_chunk.chunk_rows {
return Err(general_err!(
"insufficient rows read from array reader - expected {}, got {}",
mask_chunk.chunk_rows,
read
));
}

if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"filtered rows mismatch selection - expected {}, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}
let array = self.array_reader.consume_batch()?;
// The column reader exposes the projection as a struct array; convert this
// into a record batch before applying the boolean filter mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
})?;

if filtered_batch.num_rows() == 0 {
continue;
}
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;

if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"filtered rows mismatch selection - expected {}, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}

return Ok(Some(filtered_batch));
if filtered_batch.num_rows() == 0 {
continue;
}

return Ok(Some(filtered_batch));
}
while read_records < batch_size && !selection_cursor.is_empty() {
let front = selection_cursor.next_selector();
}
RowSelectionCursor::Selectors(selectors_cursor) => {
while read_records < batch_size && !selectors_cursor.is_empty() {
let front = selectors_cursor.next_selector();
if front.skip {
let skipped = self.array_reader.skip_records(front.row_count)?;

Expand All @@ -1174,7 +1171,7 @@ impl ParquetRecordBatchReader {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection_cursor.return_selector(RowSelector::select(remaining));
selectors_cursor.return_selector(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
Expand All @@ -1185,7 +1182,7 @@ impl ParquetRecordBatchReader {
};
}
}
None => {
RowSelectionCursor::All => {
self.array_reader.read_records(batch_size)?;
}
};
Expand Down
51 changes: 37 additions & 14 deletions parquet/src/arrow/arrow_reader/read_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::arrow::arrow_reader::{
use crate::errors::{ParquetError, Result};
use arrow_array::Array;
use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;

/// A builder for [`ReadPlan`]
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -89,6 +90,8 @@ impl ReadPlanBuilder {
}

/// Returns the preferred [`RowSelectionStrategy`] for materialising the current selection.
///
/// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
pub fn preferred_selection_strategy(&self) -> RowSelectionStrategy {
match self.selection_strategy {
RowSelectionStrategy::Selectors => RowSelectionStrategy::Selectors,
Expand Down Expand Up @@ -167,25 +170,35 @@ impl ReadPlanBuilder {
if !self.selects_any() {
self.selection = Some(RowSelection::from(vec![]));
}
let selection_strategy = match self.selection_strategy {
RowSelectionStrategy::Auto { .. } => self.preferred_selection_strategy(),
strategy => strategy,
};

// Preferred strategy must not be Auto
let selection_strategy = self.preferred_selection_strategy();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.preferred_strategy already returns the specified strategy when not auto, so I don't think there is any reason to repeat the logic here

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find! It looks like I updated the code too many times, causing some duplication.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thank you for bearing with us over the review process


let Self {
batch_size,
selection,
selection_strategy: _,
} = self;

let selection = selection.map(|s| {
let trimmed = s.trim();
let selectors: Vec<RowSelector> = trimmed.into();
RowSelectionCursor::new(selectors, selection_strategy)
});
let selection = selection.map(|s| s.trim());

let row_selection_cursor = selection
.map(|s| {
let trimmed = s.trim();
let selectors: Vec<RowSelector> = trimmed.into();
match selection_strategy {
RowSelectionStrategy::Mask => {
RowSelectionCursor::new_mask_from_selectors(selectors)
}
RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors),
RowSelectionStrategy::Auto { .. } => unreachable!(),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to figure out some way to encode the fact that the RowSelection will never be Auto. I am trying out some things in follow on PRs

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about that, one approach might be to add a new enum here, like called RowSelectionBackendType

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with it -- what I came up with was to split out the policy from the actual resolved strategy. PR incoming

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#9

}
})
.unwrap_or(RowSelectionCursor::new_all());

ReadPlan {
batch_size,
selection,
row_selection_cursor,
}
}
}
Expand Down Expand Up @@ -283,13 +296,23 @@ pub struct ReadPlan {
/// The number of rows to read in each batch
batch_size: usize,
/// Row ranges to be selected from the data source
selection: Option<RowSelectionCursor>,
row_selection_cursor: RowSelectionCursor,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than Option... I added a RowSelectionCursor::all variant for the case that all rows are selected

}

impl ReadPlan {
/// Returns a mutable reference to the selection, if any
pub fn selection_mut(&mut self) -> Option<&mut RowSelectionCursor> {
self.selection.as_mut()
/// Returns a mutable reference to the selection selectors, if any
#[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut` instead")]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also implemented my suggestion here

To avoid a backwards incompatible change

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the proposal of new API.

pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
if let RowSelectionCursor::Selectors(selectors_cursor) = &mut self.row_selection_cursor {
Some(selectors_cursor.selectors_mut())
} else {
None
}
}

/// Returns a mutable reference to the row selection cursor
pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
&mut self.row_selection_cursor
}

/// Return the number of rows to read in each output batch
Expand Down
Loading
Loading