diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index a8688e8af83..a804be30549 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -38,11 +38,13 @@ use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; +use crate::arrow::arrow_reader::read_step::ReadStep; mod filter; mod read_plan; mod selection; pub mod statistics; +pub mod read_step; /// Builder for constructing Parquet readers that decode into [Apache Arrow] /// arrays. @@ -808,54 +810,50 @@ impl ParquetRecordBatchReader { /// Returns `Result>` rather than `Option>` to /// simplify error handling with `?` fn next_inner(&mut self) -> Result> { + let mut end_of_stream = false; let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); - } - continue; - } + while read_records < batch_size { + let Some(step) = self.read_plan.next() else { + end_of_stream = true; + break; + }; - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; + + match step { + ReadStep::Skip(row_count) => { + let skipped = self.array_reader.skip_records(row_count)?; + + if skipped != row_count { + return Err(general_err!( + "Internal Error: failed to skip rows, expected {row_count}, got {skipped}", + )); + } + } + ReadStep::Read(row_count) => { + let read = self.array_reader.read_records(row_count)?; + if read == 0 { + end_of_stream = true; + break; } - // try to read record - let need_read = batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read - } - _ => front.row_count, - }; - match self.array_reader.read_records(to_read)? { - 0 => break, - rec => read_records += rec, - }; + read_records += read } - } - None => { - self.array_reader.read_records(batch_size)?; - } - }; + ReadStep::Mask{..} => { + todo!(); + } + }; + } let array = self.array_reader.consume_batch()?; + + // Reader should read exactly `batch_size` records except for last batch + if !end_of_stream && (read_records != batch_size) { + return Err(general_err!( + "Internal Error: unexpected read count. Expected {batch_size} got {read_records}" + )); + } + let struct_array = array.as_struct_opt().ok_or_else(|| { ArrowError::ParquetError("Struct array reader should return struct array".to_string()) })?; diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index cf5d8338503..29d7fd79787 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -25,14 +25,14 @@ use crate::arrow::arrow_reader::{ use crate::errors::{ParquetError, Result}; use arrow_array::Array; use arrow_select::filter::prep_null_mask_filter; -use std::collections::VecDeque; +use crate::arrow::arrow_reader::read_step::{OptimizedReadSteps, ReadStep, ReadSteps}; /// A builder for [`ReadPlan`] #[derive(Clone)] pub(crate) struct ReadPlanBuilder { batch_size: usize, - /// Current to apply, includes all filters - selection: Option, + /// Current steps to apply, includes all filters + steps: Option, } impl ReadPlanBuilder { @@ -40,25 +40,25 @@ impl ReadPlanBuilder { pub(crate) fn new(batch_size: usize) -> Self { Self { batch_size, - selection: None, + steps: None, } } - /// Set the current selection to the given value + /// Set the current steps to the given row selection, if any pub(crate) fn with_selection(mut self, selection: Option) -> Self { - self.selection = selection; + self.steps = selection.map(ReadSteps::from); self } /// Returns the current selection, if any pub(crate) fn selection(&self) -> Option<&RowSelection> { - self.selection.as_ref() + None + // TODO + // self.selection.as_ref() } - /// Specifies the number of rows in the row group, before filtering is applied. - /// - /// Returns a [`LimitedReadPlanBuilder`] that can apply - /// offset and limit. + /// Returns a [`LimitedReadPlanBuilder`] to apply offset and limit to the in + /// progress plan. /// /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to this /// selection. @@ -68,15 +68,17 @@ impl ReadPlanBuilder { /// Returns true if the current plan selects any rows pub(crate) fn selects_any(&self) -> bool { - self.selection + self.steps .as_ref() - .map(|s| s.selects_any()) + .map(ReadSteps::selects_any) + // no steps means all rows are selected .unwrap_or(true) } /// Returns the number of rows selected, or `None` if all rows are selected. pub(crate) fn num_rows_selected(&self) -> Option { - self.selection.as_ref().map(|s| s.row_count()) + self.steps.as_ref().map(ReadSteps::num_selected) + } /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` @@ -112,6 +114,8 @@ impl ReadPlanBuilder { }; } + // TODO potentially make this more efficient + let raw = RowSelection::from_filters(&filters); self.selection = match self.selection.take() { Some(selection) => Some(selection.and_then(&raw)), @@ -124,18 +128,35 @@ impl ReadPlanBuilder { pub(crate) fn build(mut self) -> ReadPlan { // If selection is empty, truncate if !self.selects_any() { - self.selection = Some(RowSelection::from(vec![])); + self.steps = Some(ReadSteps::empty()); } let Self { batch_size, - selection, + steps, } = self; - let selection = selection.map(|s| s.trim().into()); + // If the batch size is 0, read "all rows" + if batch_size == 0 { + return ReadPlan::All { batch_size: 0 }; + } - ReadPlan { - batch_size, - selection, + // If no selection is provided, read all rows + let Some(steps) = steps else { + return ReadPlan::All { batch_size }; + }; + + let iterator = OptimizedReadSteps::new(batch_size, steps); + ReadPlan::Subset { iterator } + } +} + + +impl From for ReadStep { + fn from(value: RowSelector) -> Self { + if value.skip { + Self::Skip(value.row_count) + } else { + Self::Read(value.row_count) } } } @@ -190,20 +211,20 @@ impl LimitedReadPlanBuilder { // If the selection is empty, truncate if !inner.selects_any() { - inner.selection = Some(RowSelection::from(vec![])); + inner.steps = Some(ReadSteps::empty()); } // If an offset is defined, apply it to the `selection` if let Some(offset) = offset { - inner.selection = Some(match row_count.checked_sub(offset) { - None => RowSelection::from(vec![]), + inner.steps = Some(match row_count.checked_sub(offset) { + None => ReadSteps::empty(), Some(remaining) => inner - .selection - .map(|selection| selection.offset(offset)) + .steps + .map(|steps| steps.offset(offset)) .unwrap_or_else(|| { - RowSelection::from(vec![ - RowSelector::skip(offset), - RowSelector::select(remaining), + ReadSteps::from(vec![ + ReadStep::Skip(offset), + ReadStep::Read(remaining), ]) }), }); @@ -211,12 +232,12 @@ impl LimitedReadPlanBuilder { // If a limit is defined, apply it to the final `selection` if let Some(limit) = limit { - inner.selection = Some( + inner.steps = Some( inner - .selection - .map(|selection| selection.limit(limit)) + .steps + .map(|steps| steps.limit(limit)) .unwrap_or_else(|| { - RowSelection::from(vec![RowSelector::select(limit.min(row_count))]) + ReadSteps::from(vec![ReadStep::Read(limit.min(row_count))]) }), ); } @@ -225,25 +246,421 @@ impl LimitedReadPlanBuilder { } } -/// A plan reading specific rows from a Parquet Row Group. +/// A plan for reading specific rows from a Parquet Row Group. /// /// See [`ReadPlanBuilder`] to create `ReadPlan`s -pub(crate) struct ReadPlan { - /// The number of rows to read in each batch - batch_size: usize, - /// Row ranges to be selected from the data source - selection: Option>, +/// +/// Also, note the `ReadPlan` is an iterator over [`RowSelector`]s. +#[derive(Debug)] +pub(crate) enum ReadPlan { + /// Read all rows in `batch_sized` chunks + All { + /// The number of rows to read in each batch + batch_size: usize, + }, + /// Read only a specific subset of rows + Subset { iterator: OptimizedReadSteps }, } -impl ReadPlan { - /// Returns a mutable reference to the selection, if any - pub(crate) fn selection_mut(&mut self) -> Option<&mut VecDeque> { - self.selection.as_mut() +impl Iterator for ReadPlan { + type Item = ReadStep; + + fn next(&mut self) -> Option { + match self { + // If we are reading all rows, return a selector that selects + // the next batch_size rows + Self::All { batch_size } => Some(ReadStep::Read(*batch_size)), + Self::Subset { iterator } => iterator.next(), + } } +} +impl ReadPlan { /// Return the number of rows to read in each output batch #[inline(always)] pub fn batch_size(&self) -> usize { - self.batch_size + match self { + Self::All { batch_size } => *batch_size, + Self::Subset { iterator } => iterator.batch_size(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_read_plan_select_all() { + TestCase::new() + .with_batch_size(100) + .with_empty_initial_selection() + .with_empty_expected_selection() + .run() + } + + #[test] + fn test_read_plan_empty_batch_size() { + TestCase::new() + .with_batch_size(0) + .with_row_count(0) + .with_empty_initial_selection() + .with_empty_expected_selection() + .run() + } + + #[test] + fn test_read_plan_select_only_empty() { + TestCase::new() + .with_batch_size(100) + .with_initial_selection(Some([RowSelector::skip(0)])) + .with_expected_selection(Some([])) + .run() + } + + #[test] + fn test_read_plan_select_subset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + // filter out 50 rows in the middle + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_expected_selection(Some([ + // broken up into batch_size chunks + RowSelector::select(100), + // second batch + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(50), + // third batch has 50 as we filtered out 50 rows + RowSelector::select(50), + ])) + .run() + } + + #[test] + fn test_read_plan_select_batch_boundaries() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + // select all but 50 rows in the middle using 50 row batches + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::select(25), + RowSelector::select(25), + RowSelector::select(50), + RowSelector::skip(10), + RowSelector::skip(30), + RowSelector::skip(10), + RowSelector::select(50), + RowSelector::select(50), + ])) + .with_expected_selection(Some([ + // broken up into batch_size chunks, combined + RowSelector::select(100), + // second batch + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(50), + // third batch + RowSelector::select(50), + ])) + .run() + } + + #[test] + fn test_read_plan_filters_zero_row_selects() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(0), + RowSelector::select(125), + RowSelector::select(0), + RowSelector::skip(0), + RowSelector::skip(50), + RowSelector::select(25), + ])) + // empty selectors have been filtered out + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_filters_zero_row_end_skips() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(125), + RowSelector::skip(0), + RowSelector::skip(0), + ])) + .with_expected_selection(Some([RowSelector::select(100), RowSelector::select(25)])) + .run() + } + + #[test] + fn test_read_plan_with_limit_no_skip() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(200), // limit in middle of this select + RowSelector::skip(50), + RowSelector::select(50), + ])) + .with_limit(100) + .with_expected_selection(Some([RowSelector::select(100)])) + .run() + } + + #[test] + fn test_read_plan_with_limit_after_skip() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), // limit is hit after this skip + RowSelector::select(10), + ])) + .with_limit(200) + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(10), + ])) + .run() + } + + #[test] + fn test_read_plan_with_limit_after_skip_remain() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(150), + RowSelector::skip(50), + RowSelector::select(100), // limit includes part but not all of this + ])) + .with_limit(175) + .with_expected_selection(Some([ + RowSelector::select(100), + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_with_offset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_offset(25) // skip 25 rows + .with_expected_selection(Some([ + RowSelector::skip(25), // offset + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(75), + // start second batch + RowSelector::select(25), + ])) + .run() + } + + #[test] + fn test_read_plan_with_limit_and_offset() { + TestCase::new() + .with_batch_size(100) + .with_row_count(300) + .with_initial_selection(Some([ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ])) + .with_offset(25) // skip 25 rows + .with_limit(110) + .with_expected_selection(Some([ + RowSelector::skip(25), // offset + RowSelector::select(25), + RowSelector::skip(50), + RowSelector::select(75), + // start second batch + RowSelector::select(10), // limited to 110 + ])) + .run() + } + + // test filtering + + /// Test harness for `ReadPlanBuilder` + #[derive(Debug, Default)] + struct TestCase { + batch_size: usize, + row_count: usize, + /// Optional limit to apply to plan + limit: Option, + /// Optional offset to apply to plan + offset: Option, + initial_selection: Option>, + /// if Some, expect ReadPlan::Subset + /// if None, expect ReadPlan::All + expected_selection: Option>, + } + + impl TestCase { + /// Create a new test case + fn new() -> Self { + Default::default() + } + + /// Set the batch size + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the row count + fn with_row_count(mut self, row_count: usize) -> Self { + self.row_count = row_count; + self + } + + /// Specify a limit to apply to the read plan + fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Specify an offset to apply to the read plan + fn with_offset(mut self, offset: usize) -> Self { + self.offset = Some(offset); + self + } + + /// Set the initial selection to the given set of selectors + fn with_initial_selection>( + mut self, + initial: Option, + ) -> Self { + self.initial_selection = initial.map(|initial| initial.into_iter().collect()); + self + } + /// Set the initial selection to None (used to make the tests self documenting) + fn with_empty_initial_selection(mut self) -> Self { + self.initial_selection = None; + self + } + + /// Set the expected plan to be RowPlan::Subset given set of selectors + fn with_expected_selection>( + mut self, + expected: Option, + ) -> Self { + self.expected_selection = expected.map(|expected| expected.into_iter().collect()); + self + } + /// Set the expected selection to None (used to make the tests self documenting) + fn with_empty_expected_selection(mut self) -> Self { + self.expected_selection = None; + self + } + + fn run(self) { + let Self { + batch_size, + row_count, + limit, + offset, + initial_selection, + expected_selection, + } = self; + + let initial_selection = initial_selection.map(RowSelection::from); + let plan = ReadPlanBuilder::new(batch_size) + .with_selection(initial_selection) + .limited(row_count) + .with_limit(limit) + .with_offset(offset) + .build_limited() + .build(); + + match expected_selection { + None => { + let expected_batch_size = batch_size; + assert!( + matches!(plan, ReadPlan::All { batch_size } if batch_size == expected_batch_size), + "Expected ReadPlan::All {{ batch_size={batch_size} }}, got {plan:#?}" + ); + } + Some(expected) => { + // Gather the generated selectors to compare with the expected + let actual: Vec = plan.into_iter() + .map(|read_step| { + match read_step{ + ReadStep::Read(n) => RowSelector::select(n), + ReadStep::Skip(n) => RowSelector::skip(n), + ReadStep::Mask{mask, num_selected} => { + todo!() + } + } + }) + .collect(); + Self::validate_selection(&actual, batch_size); + // use debug formatting with newlines to generate easier to grok diffs + // if the test fails + assert_eq!(format!("{actual:#?}"), format!("{expected:#?}")); + // also use assert_eq! to ensure equality + assert_eq!(actual, expected); + } + }; + } + + /// Validate that the output selections obey the rules + fn validate_selection(selectors: &[RowSelector], batch_size: usize) { + // 1. no empty selections + for selector in selectors.iter() { + assert!(selector.row_count > 0, "{selector:?} empty selection"); + } + + // 2. no selections that span batch_size boundaries + let mut current_count = 0; + for selector in selectors.iter() { + if selector.skip { + continue; + } + current_count += selector.row_count; + assert!( + current_count <= batch_size, + "current_count {current_count} > batch_size {batch_size}. Plan:\n{selectors:#?}" + ); + if current_count == batch_size { + current_count = 0; + } + } + + // 3. no trailing skip selections + if let Some(last) = selectors.last() { + assert!(!last.skip, "last selector {last:?} is a skip selector"); + } + } } } diff --git a/parquet/src/arrow/arrow_reader/read_step.rs b/parquet/src/arrow/arrow_reader/read_step.rs new file mode 100644 index 00000000000..ffc0a4e3f83 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/read_step.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use arrow_buffer::BooleanBuffer; +use crate::arrow::arrow_reader::{RowSelection, RowSelector}; + +/// How to select the next batch of rows to read from the Parquet file +/// +/// This is the internal counterpart to [`RowSelector`] except that it +/// also has the `Mask` variant which is used to apply a filter mask +/// +/// This allows the reader to dynamically choose between decoding strategies +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum ReadStep { + /// Read n rows + Read(usize), + /// Skip n rows + Skip(usize), + /// Reads mask.len() rows then applies the filter mask to select just the desired + /// rows. + /// + /// Any row with a 1 value in the mask will be selected and included + /// in the output batch. + /// + /// This is used in situations where the overhead of preferentially decoding + /// only the selected rows is higher than decoding all rows and then + /// applying a mask via filter. + Mask { + mask: BooleanBuffer, + num_selected: usize, + }, +} + + impl ReadStep { + /// returns true if this step selects rows + pub(crate) fn selects_any(&self) -> bool { + match self { + ReadStep::Read(_) => true, + ReadStep::Skip(_) => false, + ReadStep::Mask { mask: _, num_selected } => *num_selected > 0 + } + } + } + + +/// A list of [`ReadStep`]s that describe how to read from a Parquet file +/// +/// This is the internal counterpart to [`RowSelection`] +#[derive(Debug, Clone, PartialEq)] +pub (crate) struct ReadSteps { + /// The list of read steps + steps: Vec, +} + +impl ReadSteps { + /// Create a new [`ReadSteps`] instance + pub (crate) fn new(steps: Vec) -> Self { + Self { steps } + } + + /// Create a new empty [`ReadSteps`] instance + pub (crate) fn empty() -> Self { + Self::new(vec![]) + } + + /// Add a read step to the list + pub (crate) fn add(&mut self, step: ReadStep) { + self.steps.push(step); + } + + /// Get the list of read steps + pub (crate) fn steps(&self) -> &[ReadStep] { + &self.steps + } + + /// Returns true if any step selects rows + pub(crate) fn selects_any(&self) -> bool { + self.steps.iter().any(ReadStep::selects_any) + } + + /// Returns the number of rows selected + pub(crate) fn num_selected(&self) -> usize { + self.steps.iter().map(|step| { + match step { + ReadStep::Read(n) => *n, + ReadStep::Skip(_) => 0, + ReadStep::Mask{ mask: _, num_selected} => *num_selected, + } + }).sum() + } + + /// Applies an offset to this [`ReadSteps`], skipping the first `offset` selected rows + pub(crate) fn offset(mut self, offset: usize) -> Self { + if offset == 0 { + return self; + } + + let mut selected_count = 0; + let mut skipped_count = 0; + + // Find the index where the selector exceeds the row count + let find = self + .steps + .iter() + .position(|step| match step { + ReadStep::Skip(row_count) => { + skipped_count += row_count; + false + } + ReadStep::Read(row_count) => { + selected_count += row_count; + selected_count > offset + } + ReadStep::Mask {..} => todo!(), + }); + + let split_idx = match find { + Some(idx) => idx, + None => { + self.steps.clear(); + return self; + } + }; + + let mut steps = Vec::with_capacity(self.steps.len() - split_idx + 1); + steps.push(ReadStep::Skip(skipped_count + offset)); + steps.push(ReadStep::Read(selected_count - offset)); + steps.extend_from_slice(&self.steps[split_idx + 1..]); + + Self { steps } + } + + /// Limit this [`ReadSteps`] to only select `limit` rows + pub(crate) fn limit(mut self, mut limit: usize) -> Self { + if limit == 0 { + self.steps.clear(); + } + + for (idx, step) in self.steps.iter_mut().enumerate() { + match step { + ReadStep::Read(row_count) => { + if *row_count >= limit { + *row_count = limit; // update row count + self.steps.truncate(idx + 1); + break; + } + } + ReadStep::Skip(row_count) => { + limit -= *row_count; + } + ReadStep::Mask { mask: _, num_selected } => { + todo!() + } + } + } + self + } + + /// return the inner steps + pub(crate) fn into_inner(self) -> Vec { + self.steps + } +} + +impl From> for ReadSteps { + fn from(value: Vec) -> Self { + Self::new(value) + } +} + +impl From> for ReadSteps { + fn from(selection: Vec) -> Self { + let steps = selection + .into_iter() + .map(ReadStep::from) + .collect(); + Self::new(steps) + } +} + +impl From for ReadSteps { + fn from(selection: RowSelection) -> Self { + selection.into_inner().into() + } +} + +impl From for Vec { + fn from(steps: ReadSteps) -> Self { + steps.into_inner() + } +} + + +/// Incrementally returns [`ReadStep`]s that describe reading from a Parquet file. +/// +/// The returned stream of [`ReadStep`]s that is guaranteed to have: +/// 1. No empty selections (that select no rows) +/// 2. No selections that span batch_size boundaries +/// 3. No trailing skip selections +/// +/// For example, if the `batch_size` is 100 and we are selecting all 200 rows +/// from a Parquet file, the selectors will be: +/// - `ReadStep::Read(100) <-- forced break at batch_size boundary` +/// - `ReadStep::Skip(100)` +#[derive(Debug, Clone)] +pub(crate) struct OptimizedReadSteps { + /// how many rows to read in each batch + batch_size: usize, + /// how many records have been read by RowSelection in the "current" batch + read_records: usize, + /// Input selectors to read from + input_steps: VecDeque, +} + +impl Iterator for OptimizedReadSteps { + type Item = ReadStep; + + fn next(&mut self) -> Option { + while let Some(mut front) = self.input_steps.pop_front() { + match front { + // RowSelectors with row_count = 0 terminate the read, so skip such + // entries. See https://github.com/apache/arrow-rs/issues/2669 + ReadStep::Read(row_count) if row_count == 0 => { + continue; + } + ReadStep::Skip(_) => return Some(front), + ReadStep::Read(mut row_count) => { + let need_read = self.batch_size - self.read_records; + + // if there are more rows in the current RowSelector than needed to + // finish the batch, split it up + if row_count > need_read { + // Part 1: return remaining rows to the front of the queue + let remaining = row_count - need_read; + self.input_steps + .push_front(ReadStep::Read(remaining)); + // Part 2: adjust the current selector to read the rows we need + row_count = need_read; + } + + self.read_records += row_count; + // if read enough records to complete a batch, emit + if self.read_records == self.batch_size { + self.read_records = 0; + } + + return Some(ReadStep::Read(row_count)); + } + ReadStep::Mask { mask, num_selected } => { + todo!() + } + } + } + // no more selectors to read, end of stream + None + } +} + +impl OptimizedReadSteps { + pub(crate) fn new(batch_size: usize, input_steps: ReadSteps) -> Self { + let mut input_steps = VecDeque::from(input_steps.into_inner()); + // trim any trailing empty selectors + while input_steps.back().map(|step| !step.selects_any()).unwrap_or(false) { + input_steps.pop_back(); + } + + Self { + batch_size, + read_records: 0, + input_steps, + } + } + + /// Return the number of rows to read in each output batch + pub(crate) fn batch_size(&self) -> usize { + self.batch_size + } +} diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c53d47be2e5..1b36eb797ce 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -21,6 +21,9 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; +/// Represents reading or skipping some number of contiguous +/// rows when decoding a parquet file +/// /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -358,14 +361,6 @@ impl RowSelection { self.selectors.iter().any(|x| !x.skip) } - /// Trims this [`RowSelection`] removing any trailing skips - pub(crate) fn trim(mut self) -> Self { - while self.selectors.last().map(|x| x.skip).unwrap_or(false) { - self.selectors.pop(); - } - self - } - /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows pub(crate) fn offset(mut self, offset: usize) -> Self { if offset == 0 { @@ -441,6 +436,11 @@ impl RowSelection { pub fn skipped_row_count(&self) -> usize { self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() } + + /// Returns the inner selectors + pub fn into_inner(self) -> Vec { + self.selectors + } } impl From> for RowSelection { @@ -481,7 +481,7 @@ impl FromIterator for RowSelection { impl From for Vec { fn from(r: RowSelection) -> Self { - r.selectors + r.into_inner() } }