Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 44 additions & 50 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,11 @@ uninlined_format_args = "warn"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)", "cfg(tarpaulin_include)"] }
unused_qualifications = "deny"
[patch.crates-io]
arrow = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
arrow-buffer = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
arrow-flight = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
arrow-ipc = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
arrow-ord = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
arrow-schema = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
parquet = { git = "https://github.com/spiceai/arrow-rs.git", rev = "53162ed30fe6a2ed219b0af4dbbcd5d14745d7c2" } # spiceai-55.2
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ config_namespace! {
/// rows decoded.
pub enable_page_index: bool, default = true

/// (reading) If true, the parquet reader will tolerate missing page index metadata
/// rather than error out that page index metadata was expected.
pub tolerate_missing_page_index: bool, default = false

/// (reading) If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl ParquetOptions {

// not in WriterProperties
enable_page_index: _,
tolerate_missing_page_index: _,
pruning: _,
skip_metadata: _,
metadata_size_hint: _,
Expand Down Expand Up @@ -502,6 +503,7 @@ mod tests {

// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: defaults.enable_page_index,
tolerate_missing_page_index: defaults.tolerate_missing_page_index,
pruning: defaults.pruning,
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
Expand Down Expand Up @@ -608,6 +610,8 @@ mod tests {

// not in WriterProperties
enable_page_index: global_options_defaults.enable_page_index,
tolerate_missing_page_index: global_options_defaults
.tolerate_missing_page_index,
pruning: global_options_defaults.pruning,
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
Expand Down
20 changes: 15 additions & 5 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
Expand Down Expand Up @@ -73,6 +73,8 @@ pub(super) struct ParquetOpener {
/// Should the page index be read from parquet files, if present, to skip
/// data pages
pub enable_page_index: bool,
/// Should the Parquet reader tolerate missing page indexes?
pub tolerate_missing_page_index: bool,
/// Should the bloom filter be read from parquet, if present, to skip row
/// groups
pub enable_bloom_filter: bool,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl FileOpener for ParquetOpener {
.global_counter("num_predicate_creation_errors");

let enable_page_index = self.enable_page_index;
let tolerate_missing_page_index = self.tolerate_missing_page_index;

Ok(Box::pin(async move {
// Don't load the page index yet. Since it is not stored inline in
Expand Down Expand Up @@ -190,11 +193,16 @@ impl FileOpener for ParquetOpener {
// code above may not have read the page index structures yet. If we
// need them for reading and they aren't yet loaded, we need to load them now.
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
let page_index_policy = if tolerate_missing_page_index {
PageIndexPolicy::Optional
} else {
PageIndexPolicy::Required
};
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
options.with_page_index(true),
options.with_page_index_policy(page_index_policy),
page_index_policy,
)
.await?;
}
Expand Down Expand Up @@ -418,6 +426,7 @@ async fn load_page_index<T: AsyncFileReader>(
reader_metadata: ArrowReaderMetadata,
input: &mut T,
options: ArrowReaderOptions,
page_index_policy: PageIndexPolicy,
) -> Result<ArrowReaderMetadata> {
let parquet_metadata = reader_metadata.metadata();
let missing_column_index = parquet_metadata.column_index().is_none();
Expand All @@ -430,8 +439,9 @@ async fn load_page_index<T: AsyncFileReader>(
if missing_column_index || missing_offset_index {
let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
.unwrap_or_else(|e| e.as_ref().clone());
let mut reader =
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
let mut reader = ParquetMetaDataReader::new_with_metadata(m)
.with_page_index_policy(page_index_policy);

reader.load_page_index(input).await?;
let new_parquet_metadata = reader.finish()?;
let new_arrow_reader =
Expand Down
21 changes: 21 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,26 @@ impl ParquetSource {
self.table_parquet_options.global.enable_page_index
}

/// If enabled, the reader will not error if
/// the page index is missing from a parquet
/// file and `enable_page_index` is true.
pub fn with_tolerate_missing_page_index(
mut self,
tolerate_missing_page_index: bool,
) -> Self {
self.table_parquet_options
.global
.tolerate_missing_page_index = tolerate_missing_page_index;
self
}

/// Return the value described in [`Self::with_tolerate_missing_page_index`]
fn tolerate_missing_page_index(&self) -> bool {
self.table_parquet_options
.global
.tolerate_missing_page_index
}

/// If enabled, the reader will read by the bloom filter
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
Expand Down Expand Up @@ -515,6 +535,7 @@ impl FileSource for ParquetSource {
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
tolerate_missing_page_index: self.tolerate_missing_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
schema_adapter_factory,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ message ParquetOptions {
oneof coerce_int96_opt {
string coerce_int96 = 32;
}

bool tolerate_missing_page_index = 33; // default = false
}

enum JoinSide {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
#[allow(deprecated)] // max_statistics_size
Ok(ParquetOptions {
enable_page_index: value.enable_page_index,
tolerate_missing_page_index: value.tolerate_missing_page_index,
pruning: value.pruning,
skip_metadata: value.skip_metadata,
metadata_size_hint: value
Expand Down
Loading