Skip to content

Commit 3d5428d

Browse files
vustefjkyllingscovichalamb
authored
General virtual columns support + row numbers as a first use-case (#8715)
Based on #7307. # Which issue does this PR close? - Closes #7299 # Rationale for this change We need row numbers for many of the downstream features, e.g. computing unique row identifier in iceberg. # What changes are included in this PR? New API to get row numbers as a virtual column: ``` let file = File::open(path).unwrap(); let row_number_field = Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber); let options = ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]); let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options) .unwrap() .build() .expect("Could not create reader"); reader .collect::<Result<Vec<_>, _>>() .expect("Could not read") ``` ``` This column is defined as an extension type. Parquet metadata is propagated to the array builder to compute first row indexes. New Virtual column is included in addition to Primitive and Group. # Are these changes tested? Yes # Are there any user-facing changes? This is user facing feature, and has added docstrings. No breaking changes, at least I tried not to, by creating a duplicate of public method to add more parameters. --------- Co-authored-by: Jonas Irgens Kylling <[email protected]> Co-authored-by: scovich <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 5133cb9 commit 3d5428d

File tree

14 files changed

+1104
-49
lines changed

14 files changed

+1104
-49
lines changed

parquet/examples/read_with_rowgroup.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader;
2222
use parquet::arrow::{ProjectionMask, parquet_to_arrow_field_levels};
2323
use parquet::column::page::{PageIterator, PageReader};
2424
use parquet::errors::{ParquetError, Result};
25-
use parquet::file::metadata::RowGroupMetaData;
25+
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
2626
use parquet::file::reader::{ChunkReader, Length};
2727
use parquet::file::serialized_reader::SerializedPageReader;
2828
use std::sync::Arc;
@@ -35,10 +35,11 @@ async fn main() -> Result<()> {
3535
let mut file = File::open(&path).await.unwrap();
3636

3737
// The metadata could be cached in other places, this example only shows how to read
38-
let metadata = file.get_metadata(None).await?;
38+
let metadata = Arc::try_unwrap(file.get_metadata(None).await?).unwrap();
3939

40-
for rg in metadata.row_groups() {
41-
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
40+
for row_group_idx in 0..metadata.row_groups().len() {
41+
let mut rowgroup =
42+
InMemoryRowGroup::create(metadata.clone(), row_group_idx, ProjectionMask::all());
4243
rowgroup.async_fetch_data(&mut file, None).await?;
4344
let reader = rowgroup.build_reader(1024, None)?;
4445

@@ -100,14 +101,15 @@ impl ChunkReader for ColumnChunkData {
100101

101102
#[derive(Clone)]
102103
pub struct InMemoryRowGroup {
103-
pub metadata: RowGroupMetaData,
104+
metadata: ParquetMetaData,
105+
row_group_idx: usize,
104106
mask: ProjectionMask,
105107
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
106108
}
107109

108110
impl RowGroups for InMemoryRowGroup {
109111
fn num_rows(&self) -> usize {
110-
self.metadata.num_rows() as usize
112+
self.row_group_metadata().num_rows() as usize
111113
}
112114

113115
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
@@ -118,7 +120,7 @@ impl RowGroups for InMemoryRowGroup {
118120
Some(data) => {
119121
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
120122
data.clone(),
121-
self.metadata.column(i),
123+
self.row_group_metadata().column(i),
122124
self.num_rows(),
123125
None,
124126
)?);
@@ -129,26 +131,44 @@ impl RowGroups for InMemoryRowGroup {
129131
}
130132
}
131133
}
134+
135+
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
136+
Box::new(std::iter::once(self.row_group_metadata()))
137+
}
138+
139+
fn metadata(&self) -> &ParquetMetaData {
140+
&self.metadata
141+
}
132142
}
133143

134144
impl InMemoryRowGroup {
135-
pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self {
136-
let column_chunks = metadata.columns().iter().map(|_| None).collect::<Vec<_>>();
145+
pub fn create(metadata: ParquetMetaData, row_group_idx: usize, mask: ProjectionMask) -> Self {
146+
let column_chunks = metadata
147+
.row_group(row_group_idx)
148+
.columns()
149+
.iter()
150+
.map(|_| None)
151+
.collect::<Vec<_>>();
137152

138153
Self {
139154
metadata,
155+
row_group_idx,
140156
mask,
141157
column_chunks,
142158
}
143159
}
144160

161+
pub fn row_group_metadata(&self) -> &RowGroupMetaData {
162+
self.metadata.row_group(self.row_group_idx)
163+
}
164+
145165
pub fn build_reader(
146166
&self,
147167
batch_size: usize,
148168
selection: Option<RowSelection>,
149169
) -> Result<ParquetRecordBatchReader> {
150170
let levels = parquet_to_arrow_field_levels(
151-
&self.metadata.schema_descr_ptr(),
171+
&self.row_group_metadata().schema_descr_ptr(),
152172
self.mask.clone(),
153173
None,
154174
)?;
@@ -163,7 +183,7 @@ impl InMemoryRowGroup {
163183
_selection: Option<&RowSelection>,
164184
) -> Result<()> {
165185
let mut vs = std::mem::take(&mut self.column_chunks);
166-
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
186+
for (leaf_idx, meta) in self.row_group_metadata().columns().iter().enumerate() {
167187
if self.mask.leaf_included(leaf_idx) {
168188
let (start, len) = meta.byte_range();
169189
let data = reader.get_bytes(start..(start + len)).await?;

parquet/src/arrow/array_reader/builder.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,18 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
2626
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2727
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2828
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
29+
use crate::arrow::array_reader::row_number::RowNumberReader;
2930
use crate::arrow::array_reader::{
3031
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
3132
PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
3233
make_byte_array_reader,
3334
};
3435
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
35-
use crate::arrow::schema::{ParquetField, ParquetFieldType};
36+
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
3637
use crate::basic::Type as PhysicalType;
3738
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
3839
use crate::errors::{ParquetError, Result};
40+
use crate::file::metadata::ParquetMetaData;
3941
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
4042

4143
/// Builder for [`CacheOptions`]
@@ -89,6 +91,8 @@ pub struct ArrayReaderBuilder<'a> {
8991
row_groups: &'a dyn RowGroups,
9092
/// Optional cache options for the array reader
9193
cache_options: Option<&'a CacheOptions<'a>>,
94+
/// Parquet metadata for computing virtual column values
95+
parquet_metadata: Option<&'a ParquetMetaData>,
9296
/// metrics
9397
metrics: &'a ArrowReaderMetrics,
9498
}
@@ -98,6 +102,7 @@ impl<'a> ArrayReaderBuilder<'a> {
98102
Self {
99103
row_groups,
100104
cache_options: None,
105+
parquet_metadata: None,
101106
metrics,
102107
}
103108
}
@@ -108,6 +113,12 @@ impl<'a> ArrayReaderBuilder<'a> {
108113
self
109114
}
110115

116+
/// Add parquet metadata to the builder for computing virtual column values
117+
pub fn with_parquet_metadata(mut self, parquet_metadata: &'a ParquetMetaData) -> Self {
118+
self.parquet_metadata = Some(parquet_metadata);
119+
self
120+
}
121+
111122
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
112123
pub fn build_array_reader(
113124
&self,
@@ -153,6 +164,13 @@ impl<'a> ArrayReaderBuilder<'a> {
153164
Ok(Some(reader))
154165
}
155166
}
167+
ParquetFieldType::Virtual(virtual_type) => {
168+
// Virtual columns don't have data in the parquet file
169+
// They need to be built by specialized readers
170+
match virtual_type {
171+
VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
172+
}
173+
}
156174
ParquetFieldType::Group { .. } => match &field.arrow_type {
157175
DataType::Map(_, _) => self.build_map_reader(field, mask),
158176
DataType::Struct(_) => self.build_struct_reader(field, mask),
@@ -164,6 +182,18 @@ impl<'a> ArrayReaderBuilder<'a> {
164182
}
165183
}
166184

185+
fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
186+
let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
187+
ParquetError::General(
188+
"ParquetMetaData is required to read virtual row number columns.".to_string(),
189+
)
190+
})?;
191+
Ok(Box::new(RowNumberReader::try_new(
192+
parquet_metadata,
193+
self.row_groups.row_groups(),
194+
)?))
195+
}
196+
167197
/// Build array reader for map type.
168198
fn build_map_reader(
169199
&self,
@@ -439,6 +469,7 @@ impl<'a> ArrayReaderBuilder<'a> {
439469
mod tests {
440470
use super::*;
441471
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
472+
use crate::arrow::schema::virtual_type::RowNumber;
442473
use crate::file::reader::{FileReader, SerializedFileReader};
443474
use crate::util::test_common::file_util::get_test_file;
444475
use arrow::datatypes::Field;
@@ -455,6 +486,7 @@ mod tests {
455486
file_metadata.schema_descr(),
456487
ProjectionMask::all(),
457488
file_metadata.key_value_metadata(),
489+
&[],
458490
)
459491
.unwrap();
460492

@@ -472,4 +504,41 @@ mod tests {
472504

473505
assert_eq!(array_reader.get_data_type(), &arrow_type);
474506
}
507+
508+
#[test]
509+
fn test_create_array_reader_with_row_numbers() {
510+
let file = get_test_file("nulls.snappy.parquet");
511+
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
512+
513+
let file_metadata = file_reader.metadata().file_metadata();
514+
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
515+
let row_number_field = Arc::new(
516+
Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber),
517+
);
518+
let (_, fields) = parquet_to_arrow_schema_and_fields(
519+
file_metadata.schema_descr(),
520+
ProjectionMask::all(),
521+
file_metadata.key_value_metadata(),
522+
std::slice::from_ref(&row_number_field),
523+
)
524+
.unwrap();
525+
526+
let metrics = ArrowReaderMetrics::disabled();
527+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
528+
.with_parquet_metadata(file_reader.metadata())
529+
.build_array_reader(fields.as_ref(), &mask)
530+
.unwrap();
531+
532+
// Create arrow types
533+
let arrow_type = DataType::Struct(Fields::from(vec![
534+
Field::new(
535+
"b_struct",
536+
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
537+
true,
538+
),
539+
(*row_number_field).clone(),
540+
]));
541+
542+
assert_eq!(array_reader.get_data_type(), &arrow_type);
543+
}
475544
}

parquet/src/arrow/array_reader/list_array.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ mod tests {
561561
schema,
562562
ProjectionMask::all(),
563563
file_metadata.key_value_metadata(),
564+
&[],
564565
)
565566
.unwrap();
566567

parquet/src/arrow/array_reader/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::arrow::record_reader::GenericRecordReader;
2727
use crate::arrow::record_reader::buffer::ValuesBuffer;
2828
use crate::column::page::PageIterator;
2929
use crate::column::reader::decoder::ColumnValueDecoder;
30+
use crate::file::metadata::ParquetMetaData;
3031
use crate::file::reader::{FilePageIterator, FileReader};
3132

3233
mod builder;
@@ -42,12 +43,14 @@ mod map_array;
4243
mod null_array;
4344
mod primitive_array;
4445
mod row_group_cache;
46+
mod row_number;
4547
mod struct_array;
4648

4749
#[cfg(test)]
4850
mod test_util;
4951

5052
// Note that this crate is public under the `experimental` feature flag.
53+
use crate::file::metadata::RowGroupMetaData;
5154
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
5255
pub use byte_array::make_byte_array_reader;
5356
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
@@ -139,17 +142,35 @@ pub trait RowGroups {
139142
/// Returns a [`PageIterator`] for all pages in the specified column chunk
140143
/// across all row groups in this collection.
141144
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
145+
146+
/// Returns an iterator over the row groups in this collection
147+
///
148+
/// Note this may not include all row groups in [`Self::metadata`].
149+
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
150+
151+
/// Returns the parquet metadata
152+
fn metadata(&self) -> &ParquetMetaData;
142153
}
143154

144155
impl RowGroups for Arc<dyn FileReader> {
145156
fn num_rows(&self) -> usize {
146-
self.metadata().file_metadata().num_rows() as usize
157+
FileReader::metadata(self.as_ref())
158+
.file_metadata()
159+
.num_rows() as usize
147160
}
148161

149162
fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
150163
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
151164
Ok(Box::new(iterator))
152165
}
166+
167+
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
168+
Box::new(FileReader::metadata(self.as_ref()).row_groups().iter())
169+
}
170+
171+
fn metadata(&self) -> &ParquetMetaData {
172+
FileReader::metadata(self.as_ref())
173+
}
153174
}
154175

155176
/// Uses `record_reader` to read up to `batch_size` records from `pages`

0 commit comments

Comments
 (0)