Skip to content

Commit 2ad11b6

Browse files
committed
[SPARK-46037][SQL] Correctness fix for Shuffled Hash Join build left without codegen
This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang correctness fix Yes, the query result will be corrected. new test no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit af5e0a2) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent dcfefd0 commit 2ad11b6

2 files changed

Lines changed: 22 additions & 5 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
138138
UnsafeProjection.create(streamedBoundKeys)
139139

140140
@transient protected[this] lazy val boundCondition = if (condition.isDefined) {
141-
if (joinType == FullOuter && buildSide == BuildLeft) {
142-
// Put join left side before right side. This is to be consistent with
143-
// `ShuffledHashJoinExec.fullOuterJoin`.
141+
if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) {
142+
// Put join left side before right side.
144143
Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _
145144
} else {
146145
Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
2626
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
2727
import org.apache.spark.sql.execution.exchange.EnsureRequirements
2828
import org.apache.spark.sql.internal.SQLConf
29-
import org.apache.spark.sql.test.SharedSparkSession
29+
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData}
3030
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
3131

32-
class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
32+
class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with SQLTestData {
33+
setupTestData()
3334

3435
private val EnsureRequirements = new EnsureRequirements()
3536

@@ -325,4 +326,21 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
325326
(null, null, 7, 7.0)
326327
)
327328
)
329+
330+
testWithWholeStageCodegenOnAndOff(
331+
"SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { _ =>
332+
def join(hint: String): DataFrame = {
333+
sql(
334+
s"""
335+
|SELECT /*+ $hint */ *
336+
|FROM testData t1
337+
|LEFT OUTER JOIN
338+
|testData2 t2
339+
|ON key = a AND concat(value, b) = '12'
340+
|""".stripMargin)
341+
}
342+
val df1 = join("SHUFFLE_HASH(t1)")
343+
val df2 = join("SHUFFLE_MERGE(t1)")
344+
checkAnswer(df1, identity, df2.collect().toSeq)
345+
}
328346
}

0 commit comments

Comments
 (0)