Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ object TypeUtils {
case _ =>
}
}

def getMinMaxValue(dataType: DataType, values: Array[Any]): (Any, Any) = {
val sortedValues = values.sorted(getInterpretedOrdering(dataType))
(sortedValues.head, sortedValues.last)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,8 @@ object SQLConf {
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
"Large threshold won't necessarily provide much better performance. " +
"The experiment argued that 300 is the limit threshold. " +
"Spark will push-down a value greater than or equal to its minimum value and " +
Copy link
Member

@gengliangwang gengliangwang Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default value 10 is small here. What is the default threshold in IMPLA?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impala only optimize it to >= minimum value and <= maximum value: apache/impala@aa05c64

"less than or equal to its maximum value if its value exceeds this threshold. " +
"By setting this value to 0 this feature can be disabled. " +
s"This configuration only has an effect when '${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is " +
"enabled.")
Expand Down
1,076 changes: 538 additions & 538 deletions sql/core/benchmarks/FilterPushdownBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ class ParquetFileFormat
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
val parquetFilters = new ParquetFilters(dataSchema, parquetSchema,
pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, TypeUtils}
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* Some utility function to convert Spark data source filters to Parquet filters.
*/
class ParquetFilters(
schema: MessageType,
sparkSchema: StructType,
parquetSchema: MessageType,
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
Expand Down Expand Up @@ -75,7 +77,7 @@ class ParquetFilters(
}
}

val primitiveFields = getPrimitiveFields(schema.getFields.asScala.toSeq).map { field =>
val primitiveFields = getPrimitiveFields(parquetSchema.getFields.asScala.toSeq).map { field =>
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
(field.fieldNames.toSeq.quoted, field)
}
Expand Down Expand Up @@ -597,12 +599,26 @@ class ParquetFilters(
createFilterHelper(pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name, values.head)
&& values.distinct.length <= pushDownInFilterThreshold =>
values.distinct.flatMap { v =>
makeEq.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, v))
}.reduceLeftOption(FilterApi.or)
case sources.In(name, values) if pushDownInFilterThreshold > 0 &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, the impala reference sounds good. Can we make it general and push the range filter to other data sources as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is supposed to be beneficial in other sources as well, I think it makes more sense to push it to other sources as well anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems only Parquet is not well supported In predicate pushdown.
Parquet vs ORC:

OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
InSet -> InFilters (values count: 50, distribution: 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 9281 9298 12 1.7 590.1 1.0X
Parquet Vectorized (Pushdown) 9546 9561 17 1.6 606.9 1.0X
Native ORC Vectorized 6877 6897 18 2.3 437.2 1.3X
Native ORC Vectorized (Pushdown) 661 668 15 23.8 42.0 14.0X
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
InSet -> InFilters (values count: 50, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 9322 9335 22 1.7 592.7 1.0X
Parquet Vectorized (Pushdown) 9551 9573 18 1.6 607.2 1.0X
Native ORC Vectorized 6902 6915 13 2.3 438.8 1.4X
Native ORC Vectorized (Pushdown) 659 680 25 23.9 41.9 14.1X
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
InSet -> InFilters (values count: 100, distribution: 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 9278 9294 18 1.7 589.9 1.0X
Parquet Vectorized (Pushdown) 9520 9560 27 1.7 605.3 1.0X
Native ORC Vectorized 6855 6870 16 2.3 435.9 1.4X
Native ORC Vectorized (Pushdown) 795 808 16 19.8 50.5 11.7X
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
InSet -> InFilters (values count: 100, distribution: 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 9306 9311 4 1.7 591.6 1.0X
Parquet Vectorized (Pushdown) 9529 9551 16 1.7 605.8 1.0X
Native ORC Vectorized 6875 6882 7 2.3 437.1 1.4X
Native ORC Vectorized (Pushdown) 853 865 15 18.4 54.2 10.9X
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
InSet -> InFilters (values count: 100, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Parquet Vectorized 9256 9271 9 1.7 588.5 1.0X
Parquet Vectorized (Pushdown) 9500 9520 13 1.7 604.0 1.0X
Native ORC Vectorized 6843 6857 9 2.3 435.1 1.4X
Native ORC Vectorized (Pushdown) 858 870 14 18.3 54.6 10.8X

CSV:
#29642 (comment)

values.nonEmpty && canMakeFilterOn(name, values.head) =>
if (values.length <= pushDownInFilterThreshold) {
values.flatMap { v =>
makeEq.lift(nameToParquetField(name).fieldType)
.map(_(nameToParquetField(name).fieldNames, v))
}.reduceLeftOption(FilterApi.or)
} else {
sparkSchema.find { f =>
if (caseSensitive) f.name.equals(name) else f.name.equalsIgnoreCase(name)
}.map(_.dataType) match {
case Some(dataType) =>
val (min, max) = TypeUtils.getMinMaxValue(dataType, values)
createFilterHelper(
sources.And(sources.GreaterThanOrEqual(name, min),
sources.LessThanOrEqual(name, max)),
canPartialPushDownConjuncts)
case _ => None
}
}

case sources.StringStartsWith(name, prefix)
if pushDownStartWith && canMakeFilterOn(name, prefix) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ case class ParquetPartitionReaderFactory(
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
val parquetFilters = new ParquetFilters(dataSchema, parquetSchema,
pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class ParquetScanBuilder(
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetSchema =
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
val parquetFilters = new ParquetFilters(schema, parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
parquetFilters.convertibleFilters(this.filters).toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.schema.MessageType

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
Expand Down Expand Up @@ -69,9 +68,10 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
abstract class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSparkSession {

protected def createParquetFilters(
schema: MessageType,
schema: StructType,
caseSensitive: Option[Boolean] = None): ParquetFilters =
new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
new ParquetFilters(schema, new SparkToParquetSchemaConverter(conf).convert(schema),
conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp,
conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith,
conf.parquetFilterPushDownInFilterThreshold,
caseSensitive.getOrElse(conf.caseSensitiveAnalysis))
Expand Down Expand Up @@ -618,9 +618,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF
.write.format(dataSourceName).save(file.getCanonicalPath)
readParquetFile(file.getCanonicalPath) { df =>
val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
assertResult(None) {
createParquetFilters(schema).createFilter(sources.IsNull("_1"))
createParquetFilters(df.schema).createFilter(sources.IsNull("_1"))
}
}
}
Expand Down Expand Up @@ -684,14 +683,12 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
StructField("cdecimal3", DecimalType(DecimalType.MAX_PRECISION, scale))
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)

val decimal = new JBigDecimal(10).setScale(scale)
val decimal1 = new JBigDecimal(10).setScale(scale + 1)
assert(decimal.scale() === scale)
assert(decimal1.scale() === scale + 1)

val parquetFilters = createParquetFilters(parquetSchema)
val parquetFilters = createParquetFilters(schema)
assertResult(Some(lt(intColumn("cdecimal1"), 1000: Integer))) {
parquetFilters.createFilter(sources.LessThan("cdecimal1", decimal))
}
Expand Down Expand Up @@ -867,8 +864,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
StructField("c", DoubleType, nullable = true)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)
val parquetFilters = createParquetFilters(schema)
assertResult(Some(and(
lt(intColumn("a"), 10: Integer),
gt(doubleColumn("c"), 1.5: java.lang.Double)))
Expand Down Expand Up @@ -1011,8 +1007,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
StructField("c", DoubleType, nullable = true)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)
val parquetFilters = createParquetFilters(schema)
// Testing
// case sources.Or(lhs, rhs) =>
// ...
Expand Down Expand Up @@ -1066,8 +1061,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
StructField("c", DoubleType, nullable = true)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)
val parquetFilters = createParquetFilters(schema)
assertResult(Seq(sources.And(sources.LessThan("a", 10), sources.GreaterThan("c", 1.5D)))) {
parquetFilters.convertibleFilters(
Seq(sources.And(
Expand Down Expand Up @@ -1361,9 +1355,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
}

val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
assertResult(None) {
createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null))
createParquetFilters(df.schema).createFilter(sources.StringStartsWith("_1", null))
}
}

Expand All @@ -1387,8 +1380,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
StructField("a", IntegerType, nullable = false)
))

val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val parquetFilters = createParquetFilters(parquetSchema)
val parquetFilters = createParquetFilters(schema)
assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) {
parquetFilters.createFilter(sources.In("a", Array(null)))
}
Expand All @@ -1397,8 +1389,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
parquetFilters.createFilter(sources.In("a", Array(10)))
}

// Remove duplicates
assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) {
// Duplicate values should be handle by OptimizeIn
assertResult(Some(or(
FilterApi.eq(intColumn("a"), 10: Integer),
FilterApi.eq(intColumn("a"), 10: Integer)))
) {
parquetFilters.createFilter(sources.In("a", Array(10, 10)))
}

Expand All @@ -1413,7 +1408,16 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
assert(parquetFilters.createFilter(sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined)
assert(parquetFilters.createFilter(sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty)
Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isDefined)
assert(parquetFilters.createFilter(sources.In("a", Array.empty)).isEmpty)

assertResult(Some(and(
FilterApi.gtEq(intColumn("a"), -10: Integer),
FilterApi.ltEq(intColumn("a"), 100: Integer)))
) {
parquetFilters.createFilter(sources.In("a",
Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray ++ Array(100, -10, 70)))
}

import testImplicits._
withTempPath { path =>
Expand All @@ -1429,7 +1433,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
val filter = s"a in(${Range(0, count).mkString(",")})"
assert(df.where(filter).count() === count)
val actual = stripSparkFilter(df.where(filter)).collect().length
if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) {
if (pushEnabled) {
assert(actual > 1 && actual < data.length)
} else {
assert(actual === data.length)
Expand All @@ -1448,11 +1452,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
schema: StructType,
expected: FilterPredicate,
filter: sources.Filter): Unit = {
val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
val caseSensitiveParquetFilters =
createParquetFilters(parquetSchema, caseSensitive = Some(true))
createParquetFilters(schema, caseSensitive = Some(true))
val caseInsensitiveParquetFilters =
createParquetFilters(parquetSchema, caseSensitive = Some(false))
createParquetFilters(schema, caseSensitive = Some(false))
assertResult(Some(expected)) {
caseInsensitiveParquetFilters.createFilter(filter)
}
Expand Down Expand Up @@ -1511,11 +1514,17 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
FilterApi.eq(intColumn("cint"), 20: Integer)),
sources.In("CINT", Array(10, 20)))

testCaseInsensitiveResolution(
schema,
FilterApi.and(
FilterApi.gtEq(intColumn("cint"), -10: Integer),
FilterApi.ltEq(intColumn("cint"), 20: Integer)),
sources.In("CINT", Array(-10, 20) ++ Range(0, conf.parquetFilterPushDownInFilterThreshold)))

val dupFieldSchema = StructType(
Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType)))
val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
val dupCaseInsensitiveParquetFilters =
createParquetFilters(dupParquetSchema, caseSensitive = Some(false))
createParquetFilters(dupFieldSchema, caseSensitive = Some(false))
assertResult(None) {
dupCaseInsensitiveParquetFilters.createFilter(sources.EqualTo("CINT", 1000))
}
Expand Down Expand Up @@ -1631,8 +1640,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
// "parquet" is in `NESTED_PREDICATE_PUSHDOWN_V1_SOURCE_LIST`.
if (nestedPredicatePushdown || !containsNestedColumnOrDot) {
assert(selectedFilters.nonEmpty, "No filter is pushed down")
val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
val parquetFilters = createParquetFilters(schema)
val parquetFilters = createParquetFilters(df.schema)
// In this test suite, all the simple predicates are convertible here.
assert(parquetFilters.convertibleFilters(selectedFilters) === selectedFilters)
val pushedParquetFilters = selectedFilters.map { pred =>
Expand Down Expand Up @@ -1691,8 +1699,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
val sourceFilters = filters.flatMap(DataSourceStrategy.translateFilter(_, true)).toArray
val pushedFilters = scan.pushedFilters
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
val parquetFilters = createParquetFilters(schema)
val parquetFilters = createParquetFilters(df.schema)
// In this test suite, all the simple predicates are convertible here.
assert(parquetFilters.convertibleFilters(sourceFilters) === pushedFilters)
val pushedParquetFilters = pushedFilters.map { pred =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {

case InSet(child, values) if useAdvanced && values.size > inSetThreshold =>
val dataType = child.dataType
val sortedValues = values.toSeq.sorted(TypeUtils.getInterpretedOrdering(dataType))
convert(And(GreaterThanOrEqual(child, Literal(sortedValues.head, dataType)),
LessThanOrEqual(child, Literal(sortedValues.last, dataType))))
val (min, max) = TypeUtils.getMinMaxValue(dataType, values.toArray)
convert(And(GreaterThanOrEqual(child, Literal(min, dataType)),
LessThanOrEqual(child, Literal(max, dataType))))

case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))
if useAdvanced =>
Expand Down