Skip to content

Commit 6e7934e

Browse files
ueshinrxin
authored andcommitted
[SPARK-1889] [SQL] Apply splitConjunctivePredicates to join condition while finding join ke...
...ys. When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used. The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys. Author: Takuya UESHIN <[email protected]> Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits: fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys. (cherry picked from commit bb88875) Signed-off-by: Reynold Xin <[email protected]>
1 parent 30d1df5 commit 6e7934e

2 files changed

Lines changed: 24 additions & 6 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper {
129129
// as join keys.
130130
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
131131
val Join(left, right, joinType, _) = join
132-
val (joinPredicates, otherPredicates) = allPredicates.partition {
133-
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
134-
(canEvaluate(l, right) && canEvaluate(r, left)) => true
135-
case _ => false
136-
}
132+
val (joinPredicates, otherPredicates) =
133+
allPredicates.flatMap(splitConjunctivePredicates).partition {
134+
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
135+
(canEvaluate(l, right) && canEvaluate(r, left)) => true
136+
case _ => false
137+
}
137138

138139
val joinKeys = joinPredicates.map {
139140
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
2121

2222
import org.apache.spark.sql.TestData._
2323
import org.apache.spark.sql.catalyst.expressions._
24-
import org.apache.spark.sql.catalyst.plans.logical
24+
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.execution
2626
import org.apache.spark.sql.test.TestSQLContext._
2727
import org.apache.spark.sql.test.TestSQLContext.planner._
@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite {
5757
val planned = PartialAggregation(query)
5858
assert(planned.isEmpty)
5959
}
60+
61+
test("equi-join is hash-join") {
62+
val x = testData2.as('x)
63+
val y = testData2.as('y)
64+
val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed
65+
val planned = planner.HashJoin(join)
66+
assert(planned.size === 1)
67+
}
68+
69+
test("multiple-key equi-join is hash-join") {
70+
val x = testData2.as('x)
71+
val y = testData2.as('y)
72+
val join = x.join(y, Inner,
73+
Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed
74+
val planned = planner.HashJoin(join)
75+
assert(planned.size === 1)
76+
}
6077
}

0 commit comments

Comments
 (0)