diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt new file mode 100644 index 000000000000..c33f8a372d82 --- /dev/null +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-jdk11-results.txt @@ -0,0 +1,21 @@ +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 34214 35752 NaN 3.1 326.3 1.0X +With nested predicate Pushdown 86 102 11 1216.2 0.8 396.8X + +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 34211 35162 843 3.1 326.3 1.0X +With nested predicate Pushdown 3470 3514 36 30.2 33.1 9.9X + +OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 37533 37919 329 2.8 357.9 1.0X +With nested predicate Pushdown 37876 39132 536 2.8 361.2 1.0X + diff --git a/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt new file mode 100644 index 000000000000..35dd4f0a5e9c --- /dev/null +++ b/sql/core/benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt @@ -0,0 +1,21 @@ +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 30687 31552 NaN 3.4 292.7 1.0X +With nested predicate Pushdown 105 150 61 999.3 1.0 292.5X + +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 30505 31828 NaN 3.4 290.9 1.0X +With nested predicate Pushdown 3156 3215 77 33.2 30.1 9.7X + +OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6 +Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Without nested predicate Pushdown 34475 35302 NaN 3.0 328.8 1.0X +With nested predicate Pushdown 34003 34596 567 3.1 324.3 1.0X + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala new file mode 100644 index 000000000000..11bc91a4b155 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ParquetNestedPredicatePushDownBenchmark.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.internal.SQLConf + +/** + * Synthetic benchmark for nested fields predicate push down performance for Parquet datasource. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "sql/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt". + * }}} + */ +object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark { + + private val N = 100 * 1024 * 1024 + private val NUMBER_OF_ITER = 10 + + private val df: DataFrame = spark + .range(1, N, 1, 4) + .toDF("id") + .selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested") + .sort("id") + + private def addCase( + benchmark: Benchmark, + inputPath: String, + enableNestedPD: Boolean, + name: String, + withFilter: DataFrame => DataFrame): Unit = { + val loadDF = spark.read.parquet(inputPath) + benchmark.addCase(name) { _ => + withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) { + withFilter(loadDF).noop() + } + } + } + + private def createAndRunBenchmark(name: String, withFilter: DataFrame => DataFrame): Unit = { + withTempPath { tempDir => + val outputPath = tempDir.getCanonicalPath + df.write.mode(SaveMode.Overwrite).parquet(outputPath) + val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output) + addCase( + benchmark, + outputPath, + enableNestedPD = false, + "Without nested predicate Pushdown", + withFilter) + addCase( + benchmark, + outputPath, + enableNestedPD = true, + "With nested predicate Pushdown", + withFilter) + benchmark.run() + } + } + + /** + * Benchmark for sorted data with a filter which allows to filter out all the row groups + * when nested fields predicate push down enabled + */ + def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = { + createAndRunBenchmark("Can skip all row groups", _.filter("nested.x < 0")) + } + + /** + * Benchmark with a filter which allows to load only some row groups + * when nested fields predicate push down enabled + */ + def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = { + createAndRunBenchmark("Can skip some row groups", _.filter("nested.x = 100")) + } + + /** + * Benchmark with a filter which still requires to + * load all the row groups on sorted data to see if we introduce too much + * overhead or not if enable nested predicate push down. + */ + def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = { + createAndRunBenchmark("Can skip no row groups", _.filter(s"nested.x >= 0 and nested.x <= $N")) + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runLoadNoRowGroupWhenPredicatePushedDown() + runLoadSomeRowGroupWhenPredicatePushedDown() + runLoadAllRowGroupsWhenPredicatePushedDown() + } +}