diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e7f196158ae92..fcd953125997a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener { let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); - let mut reader: Box = + let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, file_meta, @@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener { let enable_page_index = self.enable_page_index; Ok(Box::pin(async move { - // Don't load the page index yet - we will decide later if we need it - let options = ArrowReaderOptions::new().with_page_index(false); - + // Don't load the page index yet. Since it is not stored inline in + // the footer, loading the page index if it is not needed will do + // unecessary I/O. We decide later if it is needed to evaluate the + // pruning predicates. Thus default to not requesting if from the + // underlying reader. + let mut options = ArrowReaderOptions::new().with_page_index(false); let mut metadata_timer = file_metrics.metadata_load_time.timer(); - let mut metadata = - ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; + + // Begin by loading the metadata from the underlying reader (note + // the returned metadata may actually include page indexes as some + // readers may return page indexes even when not requested -- for + // example when they are cached) + let mut reader_metadata = + ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) + .await?; + // Note about schemas: we are actually dealing with **3 different schemas** here: // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. - let mut physical_file_schema = Arc::clone(metadata.schema()); + let mut physical_file_schema = Arc::clone(reader_metadata.schema()); - // read with view types + // The schema loaded from the file may not be the same as the + // desired schema (for example if we want to instruct the parquet + // reader to read strings using Utf8View instead). Update if necessary if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &physical_file_schema) { physical_file_schema = Arc::new(merged); + options = options.with_schema(Arc::clone(&physical_file_schema)); + reader_metadata = ArrowReaderMetadata::try_new( + Arc::clone(reader_metadata.metadata()), + options.clone(), + )?; } // Build predicates for this specific file @@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener { &predicate_creation_errors, ); - // Now check if we should load the page index + // The page index is not stored inline in the parquet footer so the + // code above may not have raed the page index structures yet. If we + // need them for reading and they aren't yet loaded, we need to load them now. if should_enable_page_index(enable_page_index, &page_pruning_predicate) { - metadata = load_page_index( - metadata, - &mut reader, + reader_metadata = load_page_index( + reader_metadata, + &mut async_file_reader, // Since we're manually loading the page index the option here should not matter but we pass it in for consistency - ArrowReaderOptions::new() - .with_page_index(true) - .with_schema(Arc::clone(&physical_file_schema)), + options.with_page_index(true), ) .await?; } metadata_timer.stop(); - let mut builder = - ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + async_file_reader, + reader_metadata, + ); let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&physical_file_schema)?; @@ -372,12 +391,14 @@ fn build_pruning_predicates( (pruning_predicate, Some(page_pruning_predicate)) } +/// Returns a `ArrowReaderMetadata` with the page index loaded, loading +/// it from the underlying `AsyncFileReader` if necessary. async fn load_page_index( - arrow_reader: ArrowReaderMetadata, + reader_metadata: ArrowReaderMetadata, input: &mut T, options: ArrowReaderOptions, ) -> Result { - let parquet_metadata = arrow_reader.metadata(); + let parquet_metadata = reader_metadata.metadata(); let missing_column_index = parquet_metadata.column_index().is_none(); let missing_offset_index = parquet_metadata.offset_index().is_none(); // You may ask yourself: why are we even checking if the page index is already loaded here? @@ -397,6 +418,6 @@ async fn load_page_index( Ok(new_arrow_reader) } else { // No need to load the page index again, just return the existing metadata - Ok(arrow_reader) + Ok(reader_metadata) } }