Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 34214 35752 NaN 3.1 326.3 1.0X
With nested predicate Pushdown 86 102 11 1216.2 0.8 396.8X

OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 34211 35162 843 3.1 326.3 1.0X
With nested predicate Pushdown 3470 3514 36 30.2 33.1 9.9X

OpenJDK 64-Bit Server VM 11.0.2+9 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 37533 37919 329 2.8 357.9 1.0X
With nested predicate Pushdown 37876 39132 536 2.8 361.2 1.0X

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip all row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 30687 31552 NaN 3.4 292.7 1.0X
With nested predicate Pushdown 105 150 61 999.3 1.0 292.5X

OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip some row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 30505 31828 NaN 3.4 290.9 1.0X
With nested predicate Pushdown 3156 3215 77 33.2 30.1 9.7X

OpenJDK 64-Bit Server VM 1.8.0_252-b09 on Mac OS X 10.14.6
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
Can skip no row groups: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Without nested predicate Pushdown 34475 35302 NaN 3.0 328.8 1.0X
With nested predicate Pushdown 34003 34596 567 3.1 324.3 1.0X

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.benchmark

import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.internal.SQLConf

/**
* Synthetic benchmark for nested fields predicate push down performance for Parquet datasource.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/ParquetNestedPredicatePushDownBenchmark-results.txt".
* }}}
*/
object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, you need to switch to JDK8 HOME in your environment and run the above command once more. That will generate ParquetNestedPredicatePushDownBenchmark-results.txt additionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @dongjoon-hyun. I will run the benchmark with JDK8 and commit the report.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun , jdk8 benchmark results pushed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.


private val N = 100 * 1024 * 1024
private val NUMBER_OF_ITER = 10

override def getSparkSession: SparkSession = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you override it? What's the problem with default settings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk , thanks a lot, this was copied from FilterPushdownBenchmark.scala. Just realised the default setting also sets the master to local[1]...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override method removed.

val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
// Since `spark.master` always exists, overrides this value
.set("spark.master", "local[1]")

SparkSession.builder().config(conf).getOrCreate()
}

private val df: DataFrame = spark
.range(1, N, 1, 4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for creating 4 partitions (and 4 files) if you have only 1 CPU?

Copy link
Contributor Author

@JiJiTang JiJiTang Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @MaxGekk, 4 partitions are here to make sure we have multiple row groups created for the small benchmark parquet dataset (as I didn't change parquet row group block size). Multiple partitions and 1 CPU to simulate a production scenario that we get a lot of partitions across limited number of executors with limited number of cores, with nest predicate pushed down we can have big performance gain since we don't need to read all the row groups. In this benchmark, since the data set is small, if put multiple CPUs, partitions will be read in parallel when nest predicate pushdown disabled, in which case we will not be able see a clear performance gain in terms of job execution time.

.toDF("id")
.selectExpr("id", "STRUCT(id x, STRUCT(CAST(id AS STRING) z) y) nested")
.sort("id")

private def addCase(
benchmark: Benchmark,
inputPath: String,
enableNestedPD: Boolean,
name: String,
withFilter: DataFrame => DataFrame): Unit = {
val loadDF = spark.read.parquet(inputPath)
benchmark.addCase(name) { _ =>
withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) {
withFilter(loadDF).noop()
}
}
}

private def createAndRunBenchmark(name: String, withFilter: DataFrame => DataFrame): Unit = {
withTempPath { tempDir =>
val outputPath = tempDir.getCanonicalPath
df.write.mode(SaveMode.Overwrite).parquet(outputPath)
val benchmark = new Benchmark(name, N, NUMBER_OF_ITER, output = output)
addCase(
benchmark,
outputPath,
enableNestedPD = false,
"Without nested predicate Pushdown",
withFilter)
addCase(
benchmark,
outputPath,
enableNestedPD = true,
"With nested predicate Pushdown",
withFilter)
benchmark.run()
}
}

/**
* Benchmark for sorted data with a filter which allows to filter out all the row groups
* when nested fields predicate push down enabled
*/
def runLoadNoRowGroupWhenPredicatePushedDown(): Unit = {
createAndRunBenchmark("Can skip all row groups", _.filter("nested.x < 0"))
}

/**
* Benchmark with a filter which allows to load only some row groups
* when nested fields predicate push down enabled
*/
def runLoadSomeRowGroupWhenPredicatePushedDown(): Unit = {
createAndRunBenchmark("Can skip some row groups", _.filter("nested.x = 100"))
}

/**
* Benchmark with a filter which still requires to
* load all the row groups on sorted data to see if we introduce too much
* overhead or not if enable nested predicate push down.
*/
def runLoadAllRowGroupsWhenPredicatePushedDown(): Unit = {
createAndRunBenchmark("Can skip no row groups", _.filter(s"nested.x >= 0 and nested.x <= $N"))
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runLoadNoRowGroupWhenPredicatePushedDown()
runLoadSomeRowGroupWhenPredicatePushedDown()
runLoadAllRowGroupsWhenPredicatePushedDown()
}
}