Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions sql/core/benchmarks/JoinBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w long wholestage off 3200 / 3226 6.6 152.6 1.0X
Join w long wholestage on 223 / 261 94.2 10.6 14.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w long duplicated wholestage off 3727 / 3745 5.6 177.7 1.0X
Join w long duplicated wholestage on 212 / 233 98.8 10.1 17.6X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off 151556 / 152313 0.1 7226.8 1.0X
Join w 2 ints wholestage on 131599 / 138817 0.2 6275.1 1.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 longs wholestage off 5608 / 6035 3.7 267.4 1.0X
Join w 2 longs wholestage on 2256 / 2504 9.3 107.6 2.5X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 longs duplicated wholestage off 10976 / 11120 1.9 523.4 1.0X
Join w 2 longs duplicated wholestage on 1781 / 1849 11.8 84.9 6.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
outer join w long wholestage off 2436 / 2453 8.6 116.1 1.0X
outer join w long wholestage on 140 / 144 149.4 6.7 17.3X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
semi join w long wholestage off 1610 / 1625 13.0 76.8 1.0X
semi join w long wholestage on 142 / 148 148.0 6.8 11.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
merge join wholestage off 864 / 910 2.4 412.0 1.0X
merge join wholestage on 611 / 617 3.4 291.4 1.4X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

sort merge join with duplicates: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort merge join with duplicates wholestage off 1481 / 1597 1.4 706.2 1.0X
sort merge join with duplicates wholestage on 1192 / 1300 1.8 568.4 1.2X

Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz

shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
shuffle hash join wholestage off 1012 / 1034 4.1 241.2 1.0X
shuffle hash join wholestage on 799 / 861 5.2 190.5 1.3X

Original file line number Diff line number Diff line change
Expand Up @@ -19,229 +19,161 @@ package org.apache.spark.sql.execution.benchmark

import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

/**
* Benchmark to measure performance for aggregate primitives.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate primitives -> joins

* To run this:
* build/sbt "sql/test-only *benchmark.JoinBenchmark"
*
* Benchmarks in this file are skipped in normal builds.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/JoinBenchmark-results.txt".
* }}}
*/
class JoinBenchmark extends BenchmarkWithCodegen {
object JoinBenchmark extends SqlBasedBenchmark {

ignore("broadcast hash join, long key") {
def broadcastHashJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16

val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("Join w long", N) {
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v"))
codegenBenchmark("Join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Join w long codegen=false 3002 / 3262 7.0 143.2 1.0X
Join w long codegen=true 321 / 371 65.3 15.3 9.3X
*/
}

ignore("broadcast hash join, long key with duplicates") {
def broadcastHashJoinLongKeyWithDuplicates(): Unit = {
val N = 20 << 20
val M = 1 << 16

val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is a removal of redundant one, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

runBenchmark("Join w long duplicated", N) {
val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k"))
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this change, we need rerun the benchmark to get a new result.

codegenBenchmark("Join w long duplicated", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X
*Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X
*/
}

ignore("broadcast hash join, two int key") {
def broadcastHashJoinTwoIntKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim2 = broadcast(sparkSession.range(M)
val dim2 = broadcast(spark.range(M)
.selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v"))

runBenchmark("Join w 2 ints", N) {
val df = sparkSession.range(N).join(dim2,
codegenBenchmark("Join w 2 ints", N) {
val df = spark.range(N).join(dim2,
(col("id") % M).cast(IntegerType) === col("k1")
&& (col("id") % M).cast(IntegerType) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X
*Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X
*/
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan , @gatorsmile , @davies , @rxin .

We are hitting some performance slowdown in benchmark. However, this is not a regression because it's consistent in 2.0.2 ~ 2.4.0-rc3.

Join w 2 ints:                           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off              157742 / 158892          0.1        7521.7       1.0X
Join w 2 ints wholestage on               134290 / 152917          0.2        6403.4       1.2X

According to the original performance number, it seems to be a result when HashJoin.rewriteKeyExpr uses a simple upcasting to bigint. However, the current code generates a result where HashJoin.rewriteKeyExpr uses shiftleft operations.

scala> val df = spark.range(N).join(dim2, (col("id") % M).cast(IntegerType) === col("k1") && (col("id") % M).cast(IntegerType) === col("k2"))

scala> val df2 = spark.range(N).join(dim2, (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))

scala> df.explain
== Physical Plan ==
*(2) BroadcastHashJoin [cast((id#8L % 65536) as int), cast((id#8L % 65536) as int)], [k1#2, k2#3], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))))
   +- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
      +- *(1) Range (0, 65536, step=1, splits=8)

scala> df2.explain
== Physical Plan ==
*(2) BroadcastHashJoin [(id#23L % 65536), (id#23L % 65536)], [cast(k1#2 as bigint), cast(k2#3 as bigint)], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint), cast(input[1, int, false] as bigint)))
   +- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
      +- *(1) Range (0, 65536, step=1, splits=8)

Did we really want to measure the difference in HashJoin.rewriteKeyExpr?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any advice is welcome and thank you in advance, @cloud-fan , @gatorsmile , @davies , @rxin .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems caused by the bug fix: #15390

So the performance is reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for confirmation, @cloud-fan !

}

ignore("broadcast hash join, two long key") {
def broadcastHashJoinTwoLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim3 = broadcast(sparkSession.range(M)
val dim3 = broadcast(spark.range(M)
.selectExpr("id as k1", "id as k2", "cast(id as string) as v"))

runBenchmark("Join w 2 longs", N) {
val df = sparkSession.range(N).join(dim3,
codegenBenchmark("Join w 2 longs", N) {
val df = spark.range(N).join(dim3,
(col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X
*Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X
*/
}

ignore("broadcast hash join, two long key with duplicates") {
def broadcastHashJoinTwoLongKeyWithDuplicates(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim4 = broadcast(sparkSession.range(M)
val dim4 = broadcast(spark.range(M)
.selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2"))

runBenchmark("Join w 2 longs duplicated", N) {
val df = sparkSession.range(N).join(dim4,
codegenBenchmark("Join w 2 longs duplicated", N) {
val df = spark.range(N).join(dim4,
(col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X
*Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X
*/
}

ignore("broadcast hash join, outer join long key") {
def broadcastHashJoinOuterJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("outer join w long", N) {
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "left")
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v"))
codegenBenchmark("outer join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "left")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X
*outer join w long codegen=true 261 / 276 80.5 12.4 11.7X
*/
}

ignore("broadcast hash join, semi join long key") {
def broadcastHashJoinSemiJoinLongKey(): Unit = {
val N = 20 << 20
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v"))
runBenchmark("semi join w long", N) {
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi")
val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v"))
codegenBenchmark("semi join w long", N) {
val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi")
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
df.count()
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X
*semi join w long codegen=true 237 / 244 88.3 11.3 8.1X
*/
}

ignore("sort merge join") {
def sortMergeJoin(): Unit = {
val N = 2 << 20
runBenchmark("merge join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
codegenBenchmark("merge join", N) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge join -> sort merge join

val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}

/*
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*merge join codegen=false 1588 / 1880 1.3 757.1 1.0X
*merge join codegen=true 1477 / 1531 1.4 704.2 1.1X
*/
}

ignore("sort merge join with duplicates") {
def sortMergeJoinWithDuplicates(): Unit = {
val N = 2 << 20
runBenchmark("sort merge join", N) {
val df1 = sparkSession.range(N)
codegenBenchmark("sort merge join with duplicates", N) {
val df1 = spark.range(N)
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
val df2 = spark.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}

/*
*Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
*sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X
*sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X
*/
}

ignore("shuffle hash join") {
val N = 4 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", "10000000")
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
runBenchmark("shuffle hash join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
df.count()
def shuffleHashJoin(): Unit = {
val N: Long = 4 << 20
withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
codegenBenchmark("shuffle hash join", N) {
val df1 = spark.range(N).selectExpr(s"id as k1")
val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
val df = df1.join(df2, col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
df.count()
}
}
}

/*
*Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
*Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
*shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
*-------------------------------------------------------------------------------------------
*shuffle hash join codegen=false 2005 / 2010 2.1 478.0 1.0X
*shuffle hash join codegen=true 1773 / 1792 2.4 422.7 1.1X
*/
override def runBenchmarkSuite(): Unit = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you wrap the followings(line 168~177) with something like runBenchmark("Join Benchmark")?

broadcastHashJoinLongKey()
broadcastHashJoinLongKeyWithDuplicates()
broadcastHashJoinTwoIntKey()
broadcastHashJoinTwoLongKey()
broadcastHashJoinTwoLongKeyWithDuplicates()
broadcastHashJoinOuterJoinLongKey()
broadcastHashJoinSemiJoinLongKey()
sortMergeJoin()
sortMergeJoinWithDuplicates()
shuffleHashJoin()
}
}