Skip to content

Commit ea75c33

Browse files
authored
KE-41399 spark-42388 Avoid parquet footer reads twice in vectorized reader (apache#623) (apache#629)
KE-41399 spark-42388 Avoid parquet footer reads twice in vectorized reader
1 parent 19fef37 commit ea75c33

5 files changed

Lines changed: 102 additions & 13 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
<kafka.version>2.8.2</kafka.version>
139139
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
140140
<derby.version>10.14.2.0</derby.version>
141-
<parquet.version>1.12.2-kylin-r5</parquet.version>
141+
<parquet.version>1.12.2-kylin-r6</parquet.version>
142142
<orc.version>1.6.11</orc.version>
143143
<jetty.version>9.4.49.v20220914</jetty.version>
144144
<mortbay.jetty.version>7.0.0.pre5</mortbay.jetty.version>

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet;
1919

20+
import java.io.IOException;
21+
import java.net.URI;
22+
import java.net.URISyntaxException;
23+
2024
import org.apache.hadoop.conf.Configuration;
2125
import org.apache.hadoop.fs.FileStatus;
2226
import org.apache.hadoop.fs.Path;
@@ -26,14 +30,60 @@
2630
import org.apache.parquet.hadoop.ParquetFileReader;
2731
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
2832
import org.apache.parquet.hadoop.util.HadoopInputFile;
29-
30-
import java.io.IOException;
33+
import org.apache.spark.sql.execution.datasources.PartitionedFile;
3134

3235
/**
3336
* `ParquetFooterReader` is a util class which encapsulates the helper
3437
* methods of reading parquet file footer
3538
*/
3639
public class ParquetFooterReader {
40+
41+
public static final boolean SKIP_ROW_GROUPS = true;
42+
public static final boolean WITH_ROW_GROUPS = false;
43+
44+
public static ParquetFileReader reader(
45+
Configuration configuration,
46+
PartitionedFile file) throws IOException, URISyntaxException {
47+
long fileStart = file.start();
48+
ParquetMetadataConverter.MetadataFilter filter;
49+
Path path = new Path(new URI(file.filePath()));
50+
filter = HadoopReadOptions.builder(configuration, path)
51+
.withRange(fileStart, fileStart + file.length())
52+
.build()
53+
.getMetadataFilter();
54+
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, configuration);
55+
ParquetReadOptions readOptions =
56+
HadoopReadOptions.builder(inputFile.getConfiguration()).withMetadataFilter(filter).build();
57+
return ParquetFileReader.open(inputFile, readOptions);
58+
}
59+
60+
/**
61+
* Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true,
62+
* this will skip reading the Parquet row group metadata.
63+
*
64+
* @param file a part (i.e. "block") of a single file that should be read
65+
* @param configuration hadoop configuration of file
66+
* @param skipRowGroup If true, skip reading row groups;
67+
* if false, read row groups according to the file split range
68+
*/
69+
public static ParquetMetadata readFooter(
70+
Configuration configuration,
71+
PartitionedFile file,
72+
boolean skipRowGroup) throws IOException, URISyntaxException {
73+
long fileStart = file.start();
74+
ParquetMetadataConverter.MetadataFilter filter;
75+
Path path = new Path(new URI(file.filePath()));
76+
if (skipRowGroup) {
77+
filter = ParquetMetadataConverter.SKIP_ROW_GROUPS;
78+
} else {
79+
filter = HadoopReadOptions.builder(configuration, path)
80+
.withRange(fileStart, fileStart + file.length())
81+
.build()
82+
.getMetadataFilter();
83+
}
84+
return readFooter(configuration, path, filter);
85+
}
86+
3787
public static ParquetMetadata readFooter(Configuration configuration,
3888
Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException {
3989
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.parquet.hadoop.ParquetInputFormat;
4545
import org.apache.parquet.hadoop.api.InitContext;
4646
import org.apache.parquet.hadoop.api.ReadSupport;
47+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
4748
import org.apache.parquet.hadoop.util.ConfigurationUtil;
4849
import org.apache.parquet.hadoop.util.HadoopInputFile;
4950
import org.apache.parquet.schema.MessageType;
@@ -80,16 +81,27 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
8081

8182
@Override
8283
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
83-
throws IOException, InterruptedException {
84+
throws IOException, InterruptedException {
85+
initialize(inputSplit, taskAttemptContext, Option.empty());
86+
}
87+
88+
public void initialize(InputSplit inputSplit,
89+
TaskAttemptContext taskAttemptContext,
90+
Option<ParquetFileReader> fileReader)
91+
throws IOException {
8492
Configuration configuration = taskAttemptContext.getConfiguration();
8593
FileSplit split = (FileSplit) inputSplit;
8694
this.file = split.getPath();
87-
88-
ParquetReadOptions options = HadoopReadOptions
89-
.builder(configuration)
90-
.withRange(split.getStart(), split.getStart() + split.getLength())
91-
.build();
92-
this.reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
95+
if (fileReader.isDefined()) {
96+
this.reader = fileReader.get();
97+
} else {
98+
ParquetReadOptions options = HadoopReadOptions
99+
.builder(configuration, file)
100+
.withRange(split.getStart(), split.getStart() + split.getLength())
101+
.build();
102+
this.reader = new ParquetFileReader(
103+
HadoopInputFile.fromPath(file, configuration), options);
104+
}
93105
this.fileSchema = reader.getFileMetaData().getSchema();
94106
Map<String, String> fileMetadata = reader.getFileMetaData().getKeyValueMetaData();
95107
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.parquet.column.ColumnDescriptor;
2828
import org.apache.parquet.column.page.PageReadStore;
2929
import org.apache.parquet.filter2.compat.QueryMetrics;
30+
import org.apache.parquet.hadoop.ParquetFileReader;
31+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
3032
import org.apache.parquet.schema.Type;
3133

3234
import org.apache.spark.memory.MemoryMode;
@@ -39,6 +41,8 @@
3941
import org.apache.spark.sql.types.StructField;
4042
import org.apache.spark.sql.types.StructType;
4143

44+
import scala.Option;
45+
4246
/**
4347
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
4448
* Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader.
@@ -155,6 +159,16 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
155159
initializeInternal();
156160
}
157161

162+
@Override
163+
public void initialize(
164+
InputSplit inputSplit,
165+
TaskAttemptContext taskAttemptContext,
166+
Option<ParquetFileReader> fileReader)
167+
throws IOException, UnsupportedOperationException {
168+
super.initialize(inputSplit, taskAttemptContext, fileReader);
169+
initializeInternal();
170+
}
171+
158172
/**
159173
* Utility API that will read all the data in path. This circumvents the need to create Hadoop
160174
* objects to use this class. `columns` can contain the list of columns to project.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,18 @@ class ParquetFileFormat
268268

269269
S3FileUtils.tryOpenClose(sharedConf, filePath)
270270
val startTime = System.currentTimeMillis()
271-
lazy val footerFileMetaData =
272-
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
271+
var fileReader = Option.empty[ParquetFileReader]
272+
val fileFooter = if (enableVectorizedReader) {
273+
// When there are vectorized reads, we can avoid reading the footer twice by reading
274+
// all row groups in advance and filter row groups according to filters that require
275+
// push down (no need to read the footer metadata again).
276+
fileReader = Option.apply(ParquetFooterReader.reader(sharedConf, file))
277+
fileReader.get.getFooter
278+
} else {
279+
ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
280+
}
281+
val footerFileMetaData = fileFooter.getFileMetaData
282+
273283
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
274284
footerFileMetaData.getKeyValueMetaData.get,
275285
datetimeRebaseModeInRead)
@@ -322,6 +332,9 @@ class ParquetFileFormat
322332
// Notice: This push-down is RowGroups level, not individual records.
323333
if (pushed.isDefined) {
324334
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
335+
if (fileReader.isDefined) {
336+
fileReader.get.resetBlocks(hadoopAttemptContext.getConfiguration)
337+
}
325338
}
326339
val taskContext = Option(TaskContext.get())
327340
val firstFooterEndTime = System.currentTimeMillis()
@@ -335,7 +348,7 @@ class ParquetFileFormat
335348
val iter = new RecordReaderIterator(vectorizedReader)
336349
// SPARK-23457 Register a task completion listener before `initialization`.
337350
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
338-
vectorizedReader.initialize(split, hadoopAttemptContext)
351+
vectorizedReader.initialize(split, hadoopAttemptContext, fileReader)
339352
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
340353
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
341354
if (returningBatch) {

0 commit comments

Comments
 (0)