Skip to content

Commit d27295c

Browse files
kczimmJeadie
authored andcommitted
cherry-pick parquet patch (#94)
1 parent e58c24e commit d27295c

File tree

14 files changed

+108
-39
lines changed

14 files changed

+108
-39
lines changed

Cargo.lock

Lines changed: 17 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,11 @@ unexpected_cfgs = { level = "warn", check-cfg = [
230230
"cfg(tarpaulin_include)",
231231
] }
232232
unused_qualifications = "deny"
233+
[patch.crates-io]
234+
arrow = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
235+
arrow-buffer = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
236+
arrow-flight = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
237+
arrow-ipc = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
238+
arrow-ord = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
239+
arrow-schema = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
240+
parquet = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2

datafusion/common/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,10 @@ config_namespace! {
501501
/// rows decoded.
502502
pub enable_page_index: bool, default = true
503503

504+
/// (reading) If true, the parquet reader will tolerate missing page index metadata
505+
/// rather than error out that page index metadata was expected.
506+
pub tolerate_missing_page_index: bool, default = false
507+
504508
/// (reading) If true, the parquet reader attempts to skip entire row groups based
505509
/// on the predicate in the query and the metadata (min/max values) stored in
506510
/// the parquet file

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl ParquetOptions {
232232

233233
// not in WriterProperties
234234
enable_page_index: _,
235+
tolerate_missing_page_index: _,
235236
pruning: _,
236237
skip_metadata: _,
237238
metadata_size_hint: _,
@@ -507,6 +508,7 @@ mod tests {
507508

508509
// not in WriterProperties, but itemizing here to not skip newly added props
509510
enable_page_index: defaults.enable_page_index,
511+
tolerate_missing_page_index: defaults.tolerate_missing_page_index,
510512
pruning: defaults.pruning,
511513
skip_metadata: defaults.skip_metadata,
512514
metadata_size_hint: defaults.metadata_size_hint,
@@ -618,6 +620,8 @@ mod tests {
618620

619621
// not in WriterProperties
620622
enable_page_index: global_options_defaults.enable_page_index,
623+
tolerate_missing_page_index: global_options_defaults
624+
.tolerate_missing_page_index,
621625
pruning: global_options_defaults.pruning,
622626
skip_metadata: global_options_defaults.skip_metadata,
623627
metadata_size_hint: global_options_defaults.metadata_size_hint,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use log::debug;
4949
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
5050
use parquet::arrow::async_reader::AsyncFileReader;
5151
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
52-
use parquet::file::metadata::ParquetMetaDataReader;
52+
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
5353

5454
/// Implements [`FileOpener`] for a parquet file
5555
pub(super) struct ParquetOpener {
@@ -83,6 +83,8 @@ pub(super) struct ParquetOpener {
8383
/// Should the page index be read from parquet files, if present, to skip
8484
/// data pages
8585
pub enable_page_index: bool,
86+
/// Should the Parquet reader tolerate missing page indexes?
87+
pub tolerate_missing_page_index: bool,
8688
/// Should the bloom filter be read from parquet, if present, to skip row
8789
/// groups
8890
pub enable_bloom_filter: bool,
@@ -149,6 +151,8 @@ impl FileOpener for ParquetOpener {
149151
enable_page_index = false;
150152
}
151153

154+
let tolerate_missing_page_index = self.tolerate_missing_page_index;
155+
152156
Ok(Box::pin(async move {
153157
// Prune this file using the file level statistics and partition values.
154158
// Since dynamic filters may have been updated since planning it is possible that we are able
@@ -279,11 +283,16 @@ impl FileOpener for ParquetOpener {
279283
// code above may not have read the page index structures yet. If we
280284
// need them for reading and they aren't yet loaded, we need to load them now.
281285
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
286+
let page_index_policy = if tolerate_missing_page_index {
287+
PageIndexPolicy::Optional
288+
} else {
289+
PageIndexPolicy::Required
290+
};
282291
reader_metadata = load_page_index(
283292
reader_metadata,
284293
&mut async_file_reader,
285-
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
286-
options.with_page_index(true),
294+
options.with_page_index_policy(page_index_policy),
295+
page_index_policy,
287296
)
288297
.await?;
289298
}
@@ -483,6 +492,7 @@ async fn load_page_index<T: AsyncFileReader>(
483492
reader_metadata: ArrowReaderMetadata,
484493
input: &mut T,
485494
options: ArrowReaderOptions,
495+
page_index_policy: PageIndexPolicy,
486496
) -> Result<ArrowReaderMetadata> {
487497
let parquet_metadata = reader_metadata.metadata();
488498
let missing_column_index = parquet_metadata.column_index().is_none();
@@ -495,8 +505,9 @@ async fn load_page_index<T: AsyncFileReader>(
495505
if missing_column_index || missing_offset_index {
496506
let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
497507
.unwrap_or_else(|e| e.as_ref().clone());
498-
let mut reader =
499-
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
508+
let mut reader = ParquetMetaDataReader::new_with_metadata(m)
509+
.with_page_index_policy(page_index_policy);
510+
500511
reader.load_page_index(input).await?;
501512
let new_parquet_metadata = reader.finish()?;
502513
let new_arrow_reader =
@@ -651,6 +662,7 @@ mod test {
651662

652663
let make_opener = |predicate| {
653664
ParquetOpener {
665+
tolerate_missing_page_index: false,
654666
partition_index: 0,
655667
projection: Arc::new([0, 1]),
656668
batch_size: 1024,
@@ -733,6 +745,7 @@ mod test {
733745

734746
let make_opener = |predicate| {
735747
ParquetOpener {
748+
tolerate_missing_page_index: false,
736749
partition_index: 0,
737750
projection: Arc::new([0]),
738751
batch_size: 1024,
@@ -835,6 +848,7 @@ mod test {
835848
]));
836849
let make_opener = |predicate| {
837850
ParquetOpener {
851+
tolerate_missing_page_index: false,
838852
partition_index: 0,
839853
projection: Arc::new([0]),
840854
batch_size: 1024,
@@ -947,6 +961,7 @@ mod test {
947961

948962
let make_opener = |predicate| {
949963
ParquetOpener {
964+
tolerate_missing_page_index: false,
950965
partition_index: 0,
951966
projection: Arc::new([0]),
952967
batch_size: 1024,
@@ -1060,6 +1075,7 @@ mod test {
10601075

10611076
let make_opener = |predicate| {
10621077
ParquetOpener {
1078+
tolerate_missing_page_index: false,
10631079
partition_index: 0,
10641080
projection: Arc::new([0]),
10651081
batch_size: 1024,
@@ -1244,6 +1260,7 @@ mod test {
12441260
};
12451261

12461262
let make_opener = |predicate| ParquetOpener {
1263+
tolerate_missing_page_index: false,
12471264
partition_index: 0,
12481265
projection: Arc::new([0, 1]),
12491266
batch_size: 1024,

datafusion/datasource-parquet/src/source.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,26 @@ impl ParquetSource {
389389
self.table_parquet_options.global.enable_page_index
390390
}
391391

392+
/// If enabled, the reader will not error if
393+
/// the page index is missing from a parquet
394+
/// file and `enable_page_index` is true.
395+
pub fn with_tolerate_missing_page_index(
396+
mut self,
397+
tolerate_missing_page_index: bool,
398+
) -> Self {
399+
self.table_parquet_options
400+
.global
401+
.tolerate_missing_page_index = tolerate_missing_page_index;
402+
self
403+
}
404+
405+
/// Return the value described in [`Self::with_tolerate_missing_page_index`]
406+
fn tolerate_missing_page_index(&self) -> bool {
407+
self.table_parquet_options
408+
.global
409+
.tolerate_missing_page_index
410+
}
411+
392412
/// If enabled, the reader will read by the bloom filter
393413
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
394414
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
@@ -540,6 +560,7 @@ impl FileSource for ParquetSource {
540560
pushdown_filters: self.pushdown_filters(),
541561
reorder_filters: self.reorder_filters(),
542562
enable_page_index: self.enable_page_index(),
563+
tolerate_missing_page_index: self.tolerate_missing_page_index(),
543564
enable_bloom_filter: self.bloom_filter_on_read(),
544565
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
545566
schema_adapter_factory,

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,8 @@ message ParquetOptions {
554554
oneof coerce_int96_opt {
555555
string coerce_int96 = 32;
556556
}
557+
558+
bool tolerate_missing_page_index = 33; // default = false
557559
}
558560

559561
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
913913
#[allow(deprecated)] // max_statistics_size
914914
Ok(ParquetOptions {
915915
enable_page_index: value.enable_page_index,
916+
tolerate_missing_page_index: value.tolerate_missing_page_index,
916917
pruning: value.pruning,
917918
skip_metadata: value.skip_metadata,
918919
metadata_size_hint: value

0 commit comments

Comments
 (0)