Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5687a3b
Try to push down filter at RowGroups level for parquet reader.
viirya May 28, 2016
077f7f8
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 9, 2016
97ccacf
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 14, 2016
5711ae4
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 16, 2016
36fd059
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jun 24, 2016
687d75b
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Jul 7, 2016
a8bae96
Add regression test.
viirya Aug 3, 2016
6c6fc69
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 3, 2016
50095a5
Don't need two SQLConf settings.
viirya Aug 3, 2016
246129c
Improve test cases and revert the change not needed now.
viirya Aug 3, 2016
58b4689
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 4, 2016
f7baf41
Improve test case.
viirya Aug 5, 2016
a52b354
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 5, 2016
3c7afaa
Add SQL metrics for number of row groups for test purpose.
viirya Aug 5, 2016
ea81fdc
A few file format are not serializable.
viirya Aug 5, 2016
2d34803
Improve the approach to update accumulator.
viirya Aug 6, 2016
462edc7
Add a method in TaskMetrics to look up for accumulator by name.
viirya Aug 8, 2016
0b38ba1
Add comments.
viirya Aug 9, 2016
cee74b7
Remove unneeded changes.
viirya Aug 9, 2016
bbc5f7b
Merge remote-tracking branch 'upstream/master' into vectorized-reader…
viirya Aug 10, 2016
a2ba343
Prevent acculumator to be released early.
viirya Aug 10, 2016
ca074f1
Remove previous accumulator.
viirya Aug 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ class TaskMetrics private[spark] () extends Serializable {
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums

/**
* Looks for a registered accumulator by accumulator name.
*/
private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
accumulators.find { acc =>
acc.name.isDefined && acc.name.get == name
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
*/
protected long totalRowCount;

/**
* The total number of row groups this RecordReader will eventually read. Only used for
* test purpose.
*/
private int rowGroupCount;

protected ParquetFileReader reader;

@Override
Expand Down Expand Up @@ -144,8 +150,15 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
// For test purpose.
rowGroupCount = blocks.size();
}

/**
* Returns the total number of row groups to read. For test purpose only.
*/
public int getRowGroupCount() { return rowGroupCount; }

/**
* Returns the list of files at 'path' recursively. This skips files that are ignored normally
* by MapReduce.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ private[sql] case class FileSourceScanExec(

private[sql] override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
// Only for test purpose.
"numRowGroups" -> SQLMetrics.createMetric(sparkContext, "numRowGroups"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only create this for unit test (manually create in test case)?


protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.slf4j.bridge.SLF4JBridgeHandler

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -357,10 +358,27 @@ private[sql] class ParquetFileFormat
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val parquetReader = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")

// For test purpose.
// If the predefined accumulator exists, the row group number to read will be updated
// to the accumulator. So we can check if the row groups are filtered or not in test case.
val taskContext = TaskContext.get()
if (taskContext != null) {
val accu = taskContext.taskMetrics.lookForAccumulatorByName(accuNameForNumRowGroup)
if (accu.isDefined) {
accu.get.asInstanceOf[SQLMetric].add(vectorizedReader.getRowGroupCount().toLong)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move this into SpecificParquetRecordReaderBase to minimize the change.

}
}

vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
Expand Down Expand Up @@ -416,6 +434,9 @@ private[sql] class ParquetFileFormat
sqlContext.sessionState.newHadoopConf(),
options)
}

// Only for test purpose.
private val accuNameForNumRowGroup = "numRowGroups"
}

/**
Expand Down Expand Up @@ -564,85 +585,6 @@ private[sql] class ParquetOutputWriter(
}

private[sql] object ParquetFileFormat extends Logging {
/**
* If parquet's block size (row group size) setting is larger than the min split size,
* we use parquet's block size setting as the min split size. Otherwise, we will create
* tasks processing nothing (because a split does not cover the starting point of a
* parquet block). See https://issues.apache.org/jira/browse/SPARK-10143 for more information.
*/
private def overrideMinSplitSize(parquetBlockSize: Long, conf: Configuration): Unit = {
val minSplitSize =
math.max(
conf.getLong("mapred.min.split.size", 0L),
conf.getLong("mapreduce.input.fileinputformat.split.minsize", 0L))
if (parquetBlockSize > minSplitSize) {
val message =
s"Parquet's block size (row group size) is larger than " +
s"mapred.min.split.size/mapreduce.input.fileinputformat.split.minsize. Setting " +
s"mapred.min.split.size and mapreduce.input.fileinputformat.split.minsize to " +
s"$parquetBlockSize."
logDebug(message)
conf.set("mapred.min.split.size", parquetBlockSize.toString)
conf.set("mapreduce.input.fileinputformat.split.minsize", parquetBlockSize.toString)
}
}

/** This closure sets various Parquet configurations at both driver side and executor side. */
private[parquet] def initializeLocalJobFunc(
requiredColumns: Array[String],
filters: Array[Filter],
dataSchema: StructType,
parquetBlockSize: Long,
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)

// Sets flags for `CatalystSchemaConverter`
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)

overrideMinSplitSize(parquetBlockSize, conf)
}

/** This closure sets input paths at the driver side. */
private[parquet] def initializeDriverSideJobFunc(
inputFiles: Array[FileStatus],
parquetBlockSize: Long)(job: Job): Unit = {
// We side the input paths at the driver side.
logInfo(s"Reading Parquet file(s) from ${inputFiles.map(_.getPath).mkString(", ")}")
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}

private[parquet] def readSchema(
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.File
import java.nio.charset.StandardCharsets

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.mapreduce.TaskAttemptID
import org.apache.hadoop.mapreduce.TaskID
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.hadoop.ParquetInputFormat

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -368,73 +378,75 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
Seq("true", "false").map { vectorized =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A regression test added for the optional column (only existing in part of parquet files). Previously it is only for non vectorized parquet reader. Now adding test for vectorized reader. This PR has related changes before but it is removed as in the discussion folded.

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)

val pathFive = s"${dir.getCanonicalPath}/table5"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
}
Expand Down Expand Up @@ -527,4 +539,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}

test("Fiters should be pushed down for vectorized Parquet reader at row group level") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about non-vectorized reader ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-vectorized reader, we use parquet's ParquetRecordReader and push the filters into.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
I mean that we can also add test to check if we correctly push filter into ParquetRecordReader.
You know that you're also resolving SPARK-16321 (#14465) ?

Copy link
Member Author

@viirya viirya Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have test for the pushed down filters for ParquetRecordReader. But it is at individual record level. If you mean row group level, because ParquetRecordReader doesn't expose a count for the row group, I think we can't check if the filter is pushed down at row group level. Besides, it seems to be the functionality of ParquetRecordReader, and I think it should be unit test in parquet project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, as the ParquetRecordReader also uses the Configuration to get the pushed down filters, I think this also fixes SPARK-16321.

import testImplicits._

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table"
(1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)

Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
val df = spark.read.parquet(path).filter("a < 100")
df.collect()
val source = df.queryExecution.sparkPlan.collect {
case f: FileSourceScanExec => f
}.head
assert(func(source.longMetric("numRowGroups").value))
}
}
}
}
}
}