Skip to content

Commit e1de341

Browse files
wangyumHyukjinKwon
authored andcommitted
[SPARK-17091][SQL] Add rule to convert IN predicate to equivalent Parquet filter
## What changes were proposed in this pull request? The original pr is: apache#18424 Add a new optimizer rule to convert an IN predicate to an equivalent Parquet filter and add `spark.sql.parquet.pushdown.inFilterThreshold` to control limit thresholds. Different data types have different limit thresholds, this is a copy of data for reference: Type | limit threshold -- | -- string | 370 int | 210 long | 285 double | 270 float | 220 decimal | Won't provide better performance before [SPARK-24549](https://issues.apache.org/jira/browse/SPARK-24549) ## How was this patch tested? unit tests and manual tests Author: Yuming Wang <[email protected]> Closes apache#21603 from wangyum/SPARK-17091.
1 parent f1a99ad commit e1de341

5 files changed

Lines changed: 153 additions & 59 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,18 @@ object SQLConf {
386386
.booleanConf
387387
.createWithDefault(true)
388388

389+
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
390+
buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
391+
.doc("The maximum number of values to filter push-down optimization for IN predicate. " +
392+
"Large threshold won't necessarily provide much better performance. " +
393+
"The experiment argued that 300 is the limit threshold. " +
394+
"By setting this value to 0 this feature can be disabled. " +
395+
"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
396+
.internal()
397+
.intConf
398+
.checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
399+
.createWithDefault(10)
400+
389401
val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
390402
.doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
391403
"versions, when converting Parquet schema to Spark SQL schema and vice versa.")
@@ -1485,6 +1497,9 @@ class SQLConf extends Serializable with Logging {
14851497
def parquetFilterPushDownStringStartWith: Boolean =
14861498
getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
14871499

1500+
def parquetFilterPushDownInFilterThreshold: Int =
1501+
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
1502+
14881503
def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
14891504

14901505
def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

sql/core/benchmarks/FilterPushdownBenchmark-results.txt

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -417,120 +417,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
417417

418418
InSet -> InFilters (values count: 5, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
419419
------------------------------------------------------------------------------------------------
420-
Parquet Vectorized 7477 / 7587 2.1 475.4 1.0X
421-
Parquet Vectorized (Pushdown) 7862 / 8346 2.0 499.9 1.0X
422-
Native ORC Vectorized 6447 / 7021 2.4 409.9 1.2X
423-
Native ORC Vectorized (Pushdown) 983 / 1003 16.0 62.5 7.6X
420+
Parquet Vectorized 7993 / 8104 2.0 508.2 1.0X
421+
Parquet Vectorized (Pushdown) 507 / 532 31.0 32.2 15.8X
422+
Native ORC Vectorized 6922 / 7163 2.3 440.1 1.2X
423+
Native ORC Vectorized (Pushdown) 1017 / 1058 15.5 64.6 7.9X
424424

425425
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
426426
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
427427

428428
InSet -> InFilters (values count: 5, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
429429
------------------------------------------------------------------------------------------------
430-
Parquet Vectorized 7107 / 7290 2.2 451.9 1.0X
431-
Parquet Vectorized (Pushdown) 7196 / 7258 2.2 457.5 1.0X
432-
Native ORC Vectorized 6102 / 6222 2.6 388.0 1.2X
433-
Native ORC Vectorized (Pushdown) 926 / 958 17.0 58.9 7.7X
430+
Parquet Vectorized 7855 / 7963 2.0 499.4 1.0X
431+
Parquet Vectorized (Pushdown) 503 / 516 31.3 32.0 15.6X
432+
Native ORC Vectorized 6825 / 6954 2.3 433.9 1.2X
433+
Native ORC Vectorized (Pushdown) 1019 / 1044 15.4 64.8 7.7X
434434

435435
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
436436
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
437437

438438
InSet -> InFilters (values count: 5, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
439439
------------------------------------------------------------------------------------------------
440-
Parquet Vectorized 7374 / 7692 2.1 468.8 1.0X
441-
Parquet Vectorized (Pushdown) 7771 / 7848 2.0 494.1 0.9X
442-
Native ORC Vectorized 6184 / 6356 2.5 393.2 1.2X
443-
Native ORC Vectorized (Pushdown) 920 / 963 17.1 58.5 8.0X
440+
Parquet Vectorized 7858 / 7928 2.0 499.6 1.0X
441+
Parquet Vectorized (Pushdown) 490 / 519 32.1 31.1 16.0X
442+
Native ORC Vectorized 7079 / 7966 2.2 450.1 1.1X
443+
Native ORC Vectorized (Pushdown) 1276 / 1673 12.3 81.1 6.2X
444444

445445
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
446446
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
447447

448448
InSet -> InFilters (values count: 10, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
449449
------------------------------------------------------------------------------------------------
450-
Parquet Vectorized 7073 / 7326 2.2 449.7 1.0X
451-
Parquet Vectorized (Pushdown) 7304 / 7647 2.2 464.4 1.0X
452-
Native ORC Vectorized 6222 / 6579 2.5 395.6 1.1X
453-
Native ORC Vectorized (Pushdown) 958 / 994 16.4 60.9 7.4X
450+
Parquet Vectorized 8007 / 11155 2.0 509.0 1.0X
451+
Parquet Vectorized (Pushdown) 519 / 540 30.3 33.0 15.4X
452+
Native ORC Vectorized 6848 / 7072 2.3 435.4 1.2X
453+
Native ORC Vectorized (Pushdown) 1026 / 1050 15.3 65.2 7.8X
454454

455455
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
456456
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
457457

458458
InSet -> InFilters (values count: 10, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
459459
------------------------------------------------------------------------------------------------
460-
Parquet Vectorized 7121 / 7501 2.2 452.7 1.0X
461-
Parquet Vectorized (Pushdown) 7751 / 8334 2.0 492.8 0.9X
462-
Native ORC Vectorized 6225 / 6680 2.5 395.8 1.1X
463-
Native ORC Vectorized (Pushdown) 998 / 1020 15.8 63.5 7.1X
460+
Parquet Vectorized 7876 / 7956 2.0 500.7 1.0X
461+
Parquet Vectorized (Pushdown) 521 / 535 30.2 33.1 15.1X
462+
Native ORC Vectorized 7051 / 7368 2.2 448.3 1.1X
463+
Native ORC Vectorized (Pushdown) 1014 / 1035 15.5 64.5 7.8X
464464

465465
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
466466
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
467467

468468
InSet -> InFilters (values count: 10, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
469469
------------------------------------------------------------------------------------------------
470-
Parquet Vectorized 7157 / 7399 2.2 455.1 1.0X
471-
Parquet Vectorized (Pushdown) 7806 / 7911 2.0 496.3 0.9X
472-
Native ORC Vectorized 6548 / 6720 2.4 416.3 1.1X
473-
Native ORC Vectorized (Pushdown) 1016 / 1050 15.5 64.6 7.0X
470+
Parquet Vectorized 7897 / 8229 2.0 502.1 1.0X
471+
Parquet Vectorized (Pushdown) 513 / 530 30.7 32.6 15.4X
472+
Native ORC Vectorized 6730 / 6990 2.3 427.9 1.2X
473+
Native ORC Vectorized (Pushdown) 1003 / 1036 15.7 63.8 7.9X
474474

475475
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
476476
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
477477

478478
InSet -> InFilters (values count: 50, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
479479
------------------------------------------------------------------------------------------------
480-
Parquet Vectorized 7662 / 7805 2.1 487.1 1.0X
481-
Parquet Vectorized (Pushdown) 7590 / 7861 2.1 482.5 1.0X
482-
Native ORC Vectorized 6840 / 8073 2.3 434.9 1.1X
483-
Native ORC Vectorized (Pushdown) 1041 / 1075 15.1 66.2 7.4X
480+
Parquet Vectorized 7967 / 8175 2.0 506.5 1.0X
481+
Parquet Vectorized (Pushdown) 8155 / 8434 1.9 518.5 1.0X
482+
Native ORC Vectorized 7002 / 7107 2.2 445.2 1.1X
483+
Native ORC Vectorized (Pushdown) 1092 / 1139 14.4 69.4 7.3X
484484

485485
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
486486
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
487487

488488
InSet -> InFilters (values count: 50, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
489489
------------------------------------------------------------------------------------------------
490-
Parquet Vectorized 8230 / 9266 1.9 523.2 1.0X
491-
Parquet Vectorized (Pushdown) 7735 / 7960 2.0 491.8 1.1X
492-
Native ORC Vectorized 6945 / 7109 2.3 441.6 1.2X
493-
Native ORC Vectorized (Pushdown) 1123 / 1144 14.0 71.4 7.3X
490+
Parquet Vectorized 8032 / 8122 2.0 510.7 1.0X
491+
Parquet Vectorized (Pushdown) 8141 / 8908 1.9 517.6 1.0X
492+
Native ORC Vectorized 7140 / 7387 2.2 454.0 1.1X
493+
Native ORC Vectorized (Pushdown) 1156 / 1220 13.6 73.5 6.9X
494494

495495
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
496496
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
497497

498498
InSet -> InFilters (values count: 50, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
499499
------------------------------------------------------------------------------------------------
500-
Parquet Vectorized 7656 / 8058 2.1 486.7 1.0X
501-
Parquet Vectorized (Pushdown) 7860 / 8247 2.0 499.7 1.0X
502-
Native ORC Vectorized 6684 / 7003 2.4 424.9 1.1X
503-
Native ORC Vectorized (Pushdown) 1085 / 1172 14.5 69.0 7.1X
500+
Parquet Vectorized 8088 / 8350 1.9 514.2 1.0X
501+
Parquet Vectorized (Pushdown) 8629 / 8702 1.8 548.6 0.9X
502+
Native ORC Vectorized 7480 / 7886 2.1 475.6 1.1X
503+
Native ORC Vectorized (Pushdown) 1106 / 1145 14.2 70.3 7.3X
504504

505505
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
506506
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
507507

508508
InSet -> InFilters (values count: 100, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
509509
------------------------------------------------------------------------------------------------
510-
Parquet Vectorized 7594 / 8128 2.1 482.8 1.0X
511-
Parquet Vectorized (Pushdown) 7845 / 7923 2.0 498.8 1.0X
512-
Native ORC Vectorized 5859 / 6421 2.7 372.5 1.3X
513-
Native ORC Vectorized (Pushdown) 1037 / 1054 15.2 66.0 7.3X
510+
Parquet Vectorized 8028 / 8165 2.0 510.4 1.0X
511+
Parquet Vectorized (Pushdown) 8349 / 8674 1.9 530.8 1.0X
512+
Native ORC Vectorized 7107 / 7354 2.2 451.8 1.1X
513+
Native ORC Vectorized (Pushdown) 1175 / 1207 13.4 74.7 6.8X
514514

515515
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
516516
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
517517

518518
InSet -> InFilters (values count: 100, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
519519
------------------------------------------------------------------------------------------------
520-
Parquet Vectorized 6762 / 6775 2.3 429.9 1.0X
521-
Parquet Vectorized (Pushdown) 6911 / 6970 2.3 439.4 1.0X
522-
Native ORC Vectorized 5884 / 5960 2.7 374.1 1.1X
523-
Native ORC Vectorized (Pushdown) 1028 / 1052 15.3 65.4 6.6X
520+
Parquet Vectorized 8041 / 8195 2.0 511.2 1.0X
521+
Parquet Vectorized (Pushdown) 8466 / 8604 1.9 538.2 0.9X
522+
Native ORC Vectorized 7116 / 7286 2.2 452.4 1.1X
523+
Native ORC Vectorized (Pushdown) 1197 / 1214 13.1 76.1 6.7X
524524

525525
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
526526
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
527527

528528
InSet -> InFilters (values count: 100, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
529529
------------------------------------------------------------------------------------------------
530-
Parquet Vectorized 6718 / 6767 2.3 427.1 1.0X
531-
Parquet Vectorized (Pushdown) 6812 / 6909 2.3 433.1 1.0X
532-
Native ORC Vectorized 5842 / 5883 2.7 371.4 1.1X
533-
Native ORC Vectorized (Pushdown) 1040 / 1058 15.1 66.1 6.5X
530+
Parquet Vectorized 7998 / 8311 2.0 508.5 1.0X
531+
Parquet Vectorized (Pushdown) 9366 / 11257 1.7 595.5 0.9X
532+
Native ORC Vectorized 7856 / 9273 2.0 499.5 1.0X
533+
Native ORC Vectorized (Pushdown) 1350 / 1747 11.7 85.8 5.9X
534534

535535

536536
================================================================================================

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -334,17 +334,15 @@ class ParquetFileFormat
334334
val enableVectorizedReader: Boolean =
335335
sqlConf.parquetVectorizedReaderEnabled &&
336336
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
337-
val enableRecordFilter: Boolean =
338-
sparkSession.sessionState.conf.parquetRecordFilterEnabled
339-
val timestampConversion: Boolean =
340-
sparkSession.sessionState.conf.isParquetINT96TimestampConversion
337+
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
338+
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
341339
val capacity = sqlConf.parquetVectorizedReaderBatchSize
342-
val enableParquetFilterPushDown: Boolean =
343-
sparkSession.sessionState.conf.parquetFilterPushDown
340+
val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
344341
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
345342
val returningBatch = supportBatch(sparkSession, resultSchema)
346343
val pushDownDate = sqlConf.parquetFilterPushDownDate
347344
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
345+
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
348346

349347
(file: PartitionedFile) => {
350348
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -368,12 +366,13 @@ class ParquetFileFormat
368366
val pushed = if (enableParquetFilterPushDown) {
369367
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
370368
.getFileMetaData.getSchema
369+
val parquetFilters = new ParquetFilters(pushDownDate,
370+
pushDownStringStartWith, pushDownInFilterThreshold)
371371
filters
372372
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
373373
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
374374
// is used here.
375-
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
376-
.createFilter(parquetSchema, _))
375+
.flatMap(parquetFilters.createFilter(parquetSchema, _))
377376
.reduceOption(FilterApi.and)
378377
} else {
379378
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ import org.apache.spark.unsafe.types.UTF8String
3737
/**
3838
* Some utility function to convert Spark data source filters to Parquet filters.
3939
*/
40-
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
40+
private[parquet] class ParquetFilters(
41+
pushDownDate: Boolean,
42+
pushDownStartWith: Boolean,
43+
pushDownInFilterThreshold: Int) {
4144

4245
private case class ParquetSchemaType(
4346
originalType: OriginalType,
@@ -232,6 +235,15 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
232235
// See SPARK-20364.
233236
def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")
234237

238+
// All DataTypes that support `makeEq` can provide better performance.
239+
def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match {
240+
case ParquetBooleanType | ParquetByteType | ParquetShortType | ParquetIntegerType
241+
| ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType
242+
| ParquetBinaryType => true
243+
case ParquetDateType if pushDownDate => true
244+
case _ => false
245+
}
246+
235247
// NOTE:
236248
//
237249
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -295,6 +307,12 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith:
295307
case sources.Not(pred) =>
296308
createFilter(schema, pred).map(FilterApi.not)
297309

310+
case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name)
311+
&& values.distinct.length <= pushDownInFilterThreshold =>
312+
values.distinct.flatMap { v =>
313+
makeEq.lift(nameToType(name)).map(_(name, v))
314+
}.reduceLeftOption(FilterApi.or)
315+
298316
case sources.StringStartsWith(name, prefix) if pushDownStartWith && canMakeFilterOn(name) =>
299317
Option(prefix).map { v =>
300318
FilterApi.userDefined(binaryColumn(name),

0 commit comments

Comments
 (0)