Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.RightOuter
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.planning.FilterAndInnerJoins
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
Expand All @@ -44,6 +42,7 @@ object DefaultOptimizer extends Optimizer {
// Operator push down
SetOperationPushDown,
SamplePushDown,
ReorderJoin,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
Expand Down Expand Up @@ -711,6 +710,49 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel
}
}

/**
* Reorder the joins so that the bottom ones have at least one condition.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add to this comment what makes this rule stable? It's not obvious from reading the code.

object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this comment if it is the same as the object comment or augment this with more detail.

Can you comment what the input arguments are? What is input? The least common ancestor of joins? Similar for conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

* Reorder the joins so that the bottom ones have at least one condition.
*/
def reorder(
input: LogicalPlan,
joins: Seq[LogicalPlan],
conditions: Seq[Expression]): LogicalPlan = {
// filter out the conditions that could be pushed down to `joined`
val otherConditions = conditions.filterNot { cond =>
cond.references.subsetOf(input.outputSet)
}
if (joins.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have the pattern return None if there are no joins so you don't have to handle this case.

input
} else if (otherConditions.isEmpty) {
// no condition for these joins, so put them in original order
(Seq(input) ++ joins).reduceLeft(Join(_, _, Inner, None))
} else {
// find out the first join that have at least one condition
val conditionalJoin = joins.find { plan =>
val refs = input.outputSet ++ plan.outputSet
otherConditions.exists(cond => cond.references.subsetOf(refs))
}
assert(conditionalJoin.isDefined)
val picked = conditionalJoin.get
val joined = Join(input, picked, Inner, None)
reorder(joined, joins.filterNot(_ eq picked), otherConditions)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// TODO: support outer join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would consider omitting this

case FilterAndInnerJoins(input, joins, filterConditions) if joins.size > 1 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExtractFiltersAndInnerJoins?

assert(filterConditions.nonEmpty)
val joined = reorder(input, joins, filterConditions)
Filter(filterConditions.reduceLeft(And), joined)
}
}

/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef

/**
* A pattern that matches any number of project or filter operations on top of another relational
Expand Down Expand Up @@ -132,6 +131,38 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
}
}

/**
* A pattern that collects the filter and inner joins.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it much more work to extract all the filters? For example if there is a filter after the inner join of input and plan 1. We'd ideally use this for predicate progation as well.

For example

select * from t1 join t2 on t1.key = t2.key and t1.key = 5. If we collected all the filters, this could be used to infer t2.key = 5 and push that down to t2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* Filter
* |
* inner Join
* / \ ----> (filters, Seq(plan1, plan2), input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment to match the return order of the function (input, joins, filters)

* inner join plan2
* / \
* input plan1
*/
object FilterAndInnerJoins extends PredicateHelper {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[LogicalPlan], Seq[Expression])] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I suggested this interface, but now I'm questioning why we are differentiating the input from planN. Is there a reason we shouldn't just return a single Seq[LogicalPlan]?

plan match {
case f @ Filter(filterCondition, j @ Join(left, right, Inner, None)) =>

// flatten all inner joins, which are next to each other and has no condition
def flattenJoin(plan: LogicalPlan): (LogicalPlan, Seq[LogicalPlan]) = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building on what @nongli said, I think that you can have the top level match handle both filters and joins, which should make this more powerful (similar to what we do in PhysicalOperation here).

case Join(left, right, Inner, None) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't these have conditions? Seems this would just go into the returned filters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val (input, joins) = flattenJoin(left)
(input, joins ++ Seq(right))
case _ => (plan, Seq())
}

val allConditions = splitConjunctivePredicates(filterCondition)
val (input, joins) = flattenJoin(j)
Some((input, joins, allConditions))

case _ => None
}
}

/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class FilterPushdownSuite extends PlanTest {
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
ReorderJoin,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
Expand Down Expand Up @@ -548,6 +549,25 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("joins: reorder inner joins") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big enough optimization that we might put it in its own suite.

val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)

val originalQuery = {
x.join(y).join(z)
.where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
}

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
x.join(z, condition = Some("x.b".attr === "z.b".attr))
.join(y, condition = Some("y.d".attr === "z.a".attr))
.analyze

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))

test("generate: predicate referenced no generated column") {
Expand Down Expand Up @@ -750,4 +770,5 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spurious change

}