Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -1457,8 +1458,8 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))
val rightEvaluateCondition = pushDownCandidates.filter(_.references.subsetOf(right.outputSet))
val commonCondition = rest.filterNot(_.references.subsetOf(right.outputSet))

(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic)
}
Expand All @@ -1468,6 +1469,12 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
case _ => false
}

private def pushDownJoinConditions(conditions: Seq[Expression], plan: LogicalPlan) = {
conditions
.filterNot(_.semanticEquals(TrueLiteral)) // Push down true condition is useless.
Copy link
Contributor

@cloud-fan cloud-fan Mar 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be optimized by BooleanSimplification already? true And cond -> cond

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is another issue. Will be fixed by another PR.

.reduceLeftOption(And).map(Filter(_, plan)).getOrElse(plan)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
Expand Down Expand Up @@ -1526,26 +1533,22 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
joinType match {
case _: InnerLike | LeftSemi =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newLeft = pushDownJoinConditions(leftJoinConditions, left)
val newRight = pushDownJoinConditions(rightJoinConditions, right)
val newJoinCond = commonJoinCondition.reduceLeftOption(And)

Join(newLeft, newRight, joinType, newJoinCond, hint)
case RightOuter =>
// push down the left side only join filter for left side sub query
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newLeft = pushDownJoinConditions(leftJoinConditions, left)
val newRight = right
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, RightOuter, newJoinCond, hint)
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newRight = pushDownJoinConditions(rightJoinConditions, right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, joinType, newJoinCond, hint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._
* - Join with one or two empty children (including Intersect/Except).
* 2. Unary-node Logical Plans
* - Project/Filter/Sample/Join/Limit/Repartition with all empty children.
* - Join with false condition.
* - Aggregate with all empty children and at least one grouping expression.
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
*/
Expand Down Expand Up @@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
// Joins on empty LocalRelations generated from streaming sources are not eliminated
// as stateful streaming joins need to perform other state management operations other than
// just processing the input data.
case p @ Join(_, _, joinType, _, _)
case p @ Join(_, _, joinType, conditionOpt, _)
if !p.children.exists(_.isStreaming) =>
val isLeftEmpty = isEmptyLocalRelation(p.left)
val isRightEmpty = isEmptyLocalRelation(p.right)
if (isLeftEmpty || isRightEmpty) {
val isFalseCondition = conditionOpt match {
case Some(FalseLiteral) => true
case _ => false
}
if (isLeftEmpty || isRightEmpty || isFalseCondition) {
joinType match {
case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
case LeftSemi if isRightEmpty => empty(p)
case LeftAnti if isRightEmpty => p.left
case LeftSemi if isRightEmpty | isFalseCondition => empty(p)
case LeftAnti if isRightEmpty | isFalseCondition => p.left
case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
case LeftOuter | FullOuter if isRightEmpty =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isRightEmpty => empty(p)
case RightOuter | FullOuter if isLeftEmpty =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case LeftOuter if isFalseCondition =>
Project(p.left.output ++ nullValueProjectList(p.right), p.left)
case RightOuter if isFalseCondition =>
Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
case _ => p
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,4 +1384,25 @@ class FilterPushdownSuite extends PlanTest {
condition = Some("x.a".attr === "z.a".attr)).analyze
comparePlans(optimized, correctAnswer)
}

test("SPARK-28220: Push down true join condition for inner join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y, condition = Some(true))

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = x.join(y, condition = None).analyze
comparePlans(optimized, correctAnswer)
}

test("SPARK-28220: Should not push down true join condition for left/right join") {
Seq(LeftOuter, RightOuter).foreach { joinType =>
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y, joinType = joinType, condition = Some(true))

val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -147,7 +148,29 @@ class PropagateEmptyRelationSuite extends PlanTest {
testcases.foreach { case (left, right, jt, answer) =>
val query = testRelation1
.where(left)
.join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr))
.join(testRelation2.where(right), joinType = jt, condition = Some('a.attr === 'b.attr))
val optimized = Optimize.execute(query.analyze)
val correctAnswer =
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") {
val testcases = Seq(
(Inner, Some(LocalRelation('a.int, 'b.int))),
(Cross, Some(LocalRelation('a.int, 'b.int))),
(LeftOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(RightOuter,
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(FullOuter, None),
(LeftAnti, Some(testRelation1)),
(LeftSemi, Some(LocalRelation('a.int)))
)

testcases.foreach { case (jt, answer) =>
val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral))
val optimized = Optimize.execute(query.analyze)
val correctAnswer =
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
Expand Down