Skip to content

Commit 718b6ba

Browse files
committed
[SPARK-17274][SQL] Move join optimizer rules into a separate file
## What changes were proposed in this pull request? As part of breaking Optimizer.scala apart, this patch moves various join rules into a single file. ## How was this patch tested? This should be covered by existing tests. Author: Reynold Xin <[email protected]> Closes #14846 from rxin/SPARK-17274.
1 parent 5aad450 commit 718b6ba

2 files changed

Lines changed: 134 additions & 106 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -799,112 +799,6 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
799799
}
800800
}
801801

802-
/**
803-
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
804-
* one condition.
805-
*
806-
* The order of joins will not be changed if all of them already have at least one condition.
807-
*/
808-
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
809-
810-
/**
811-
* Join a list of plans together and push down the conditions into them.
812-
*
813-
* The joined plan are picked from left to right, prefer those has at least one join condition.
814-
*
815-
* @param input a list of LogicalPlans to join.
816-
* @param conditions a list of condition for join.
817-
*/
818-
@tailrec
819-
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
820-
assert(input.size >= 2)
821-
if (input.size == 2) {
822-
val (joinConditions, others) = conditions.partition(
823-
e => !SubqueryExpression.hasCorrelatedSubquery(e))
824-
val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
825-
if (others.nonEmpty) {
826-
Filter(others.reduceLeft(And), join)
827-
} else {
828-
join
829-
}
830-
} else {
831-
val left :: rest = input.toList
832-
// find out the first join that have at least one join condition
833-
val conditionalJoin = rest.find { plan =>
834-
val refs = left.outputSet ++ plan.outputSet
835-
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
836-
.exists(_.references.subsetOf(refs))
837-
}
838-
// pick the next one if no condition left
839-
val right = conditionalJoin.getOrElse(rest.head)
840-
841-
val joinedRefs = left.outputSet ++ right.outputSet
842-
val (joinConditions, others) = conditions.partition(
843-
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
844-
val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
845-
846-
// should not have reference to same logical plan
847-
createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
848-
}
849-
}
850-
851-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
852-
case j @ ExtractFiltersAndInnerJoins(input, conditions)
853-
if input.size > 2 && conditions.nonEmpty =>
854-
createOrderedJoin(input, conditions)
855-
}
856-
}
857-
858-
/**
859-
* Elimination of outer joins, if the predicates can restrict the result sets so that
860-
* all null-supplying rows are eliminated
861-
*
862-
* - full outer -> inner if both sides have such predicates
863-
* - left outer -> inner if the right side has such predicates
864-
* - right outer -> inner if the left side has such predicates
865-
* - full outer -> left outer if only the left side has such predicates
866-
* - full outer -> right outer if only the right side has such predicates
867-
*
868-
* This rule should be executed before pushing down the Filter
869-
*/
870-
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
871-
872-
/**
873-
* Returns whether the expression returns null or false when all inputs are nulls.
874-
*/
875-
private def canFilterOutNull(e: Expression): Boolean = {
876-
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
877-
val attributes = e.references.toSeq
878-
val emptyRow = new GenericInternalRow(attributes.length)
879-
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
880-
v == null || v == false
881-
}
882-
883-
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
884-
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
885-
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
886-
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
887-
888-
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
889-
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
890-
891-
join.joinType match {
892-
case RightOuter if leftHasNonNullPredicate => Inner
893-
case LeftOuter if rightHasNonNullPredicate => Inner
894-
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
895-
case FullOuter if leftHasNonNullPredicate => LeftOuter
896-
case FullOuter if rightHasNonNullPredicate => RightOuter
897-
case o => o
898-
}
899-
}
900-
901-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
902-
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
903-
val newJoinType = buildNewJoinType(f, j)
904-
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
905-
}
906-
}
907-
908802
/**
909803
* Pushes down [[Filter]] operators where the `condition` can be
910804
* evaluated using only the attributes of the left or right side of a join. Other
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import scala.annotation.tailrec
21+
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
24+
import org.apache.spark.sql.catalyst.plans._
25+
import org.apache.spark.sql.catalyst.plans.logical._
26+
import org.apache.spark.sql.catalyst.rules._
27+
28+
29+
/**
30+
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
31+
* one condition.
32+
*
33+
* The order of joins will not be changed if all of them already have at least one condition.
34+
*/
35+
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
36+
37+
/**
38+
* Join a list of plans together and push down the conditions into them.
39+
*
40+
* The joined plan are picked from left to right, prefer those has at least one join condition.
41+
*
42+
* @param input a list of LogicalPlans to join.
43+
* @param conditions a list of condition for join.
44+
*/
45+
@tailrec
46+
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
47+
assert(input.size >= 2)
48+
if (input.size == 2) {
49+
val (joinConditions, others) = conditions.partition(
50+
e => !SubqueryExpression.hasCorrelatedSubquery(e))
51+
val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
52+
if (others.nonEmpty) {
53+
Filter(others.reduceLeft(And), join)
54+
} else {
55+
join
56+
}
57+
} else {
58+
val left :: rest = input.toList
59+
// find out the first join that have at least one join condition
60+
val conditionalJoin = rest.find { plan =>
61+
val refs = left.outputSet ++ plan.outputSet
62+
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
63+
.exists(_.references.subsetOf(refs))
64+
}
65+
// pick the next one if no condition left
66+
val right = conditionalJoin.getOrElse(rest.head)
67+
68+
val joinedRefs = left.outputSet ++ right.outputSet
69+
val (joinConditions, others) = conditions.partition(
70+
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
71+
val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
72+
73+
// should not have reference to same logical plan
74+
createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
75+
}
76+
}
77+
78+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
79+
case j @ ExtractFiltersAndInnerJoins(input, conditions)
80+
if input.size > 2 && conditions.nonEmpty =>
81+
createOrderedJoin(input, conditions)
82+
}
83+
}
84+
85+
86+
/**
87+
* Elimination of outer joins, if the predicates can restrict the result sets so that
88+
* all null-supplying rows are eliminated
89+
*
90+
* - full outer -> inner if both sides have such predicates
91+
* - left outer -> inner if the right side has such predicates
92+
* - right outer -> inner if the left side has such predicates
93+
* - full outer -> left outer if only the left side has such predicates
94+
* - full outer -> right outer if only the right side has such predicates
95+
*
96+
* This rule should be executed before pushing down the Filter
97+
*/
98+
object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
99+
100+
/**
101+
* Returns whether the expression returns null or false when all inputs are nulls.
102+
*/
103+
private def canFilterOutNull(e: Expression): Boolean = {
104+
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
105+
val attributes = e.references.toSeq
106+
val emptyRow = new GenericInternalRow(attributes.length)
107+
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
108+
v == null || v == false
109+
}
110+
111+
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
112+
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
113+
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
114+
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
115+
116+
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
117+
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
118+
119+
join.joinType match {
120+
case RightOuter if leftHasNonNullPredicate => Inner
121+
case LeftOuter if rightHasNonNullPredicate => Inner
122+
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
123+
case FullOuter if leftHasNonNullPredicate => LeftOuter
124+
case FullOuter if rightHasNonNullPredicate => RightOuter
125+
case o => o
126+
}
127+
}
128+
129+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
130+
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
131+
val newJoinType = buildNewJoinType(f, j)
132+
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
133+
}
134+
}

0 commit comments

Comments
 (0)