Skip to content

Commit c21748a

Browse files
committed
address the conflicts of two rules: PushPredicateThroughProject and PushProjectThroughFilter.
1 parent c6221a4 commit c21748a

2 files changed

Lines changed: 35 additions & 9 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
6565
SamplePushDown,
6666
ReorderJoin,
6767
OuterJoinElimination,
68-
PushPredicateThroughJoin,
6968
PushPredicateThroughProject,
69+
PushPredicateThroughJoin,
7070
PushPredicateThroughGenerate,
7171
PushPredicateThroughAggregate,
7272
LimitPushDown,
73+
PushProjectThroughFilter,
7374
ColumnPruning,
7475
EliminateOperators,
7576
// Operator combine
@@ -91,6 +92,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
9192
SimplifyCasts,
9293
SimplifyCaseConversionExpressions,
9394
EliminateSerialization) ::
95+
// Because ColumnPruning is called after PushPredicateThroughProject, the predicate push down
96+
// is reversed. This batch is to ensure Filter is pushed below Project, if possible.
97+
Batch("Push Predicate Through Project", Once,
98+
PushPredicateThroughProject) ::
9499
Batch("Decimal Optimizations", FixedPoint(100),
95100
DecimalAggregates) ::
96101
Batch("LocalRelation", FixedPoint(100),
@@ -306,14 +311,28 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
306311
}
307312

308313
/**
309-
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
310-
* transformations:
314+
* Attempts to eliminate the reading of unneeded columns from the query plan
315+
* by pushing Project through Filter.
311316
*
312-
* - Inserting Projections beneath the following operators:
313-
* - Aggregate
314-
* - Generate
315-
* - Project <- Join
316-
* - LeftSemiJoin
317+
* Note: This rule could reverse the effects of PushPredicateThroughProject.
318+
* This rule should be run before ColumnPruning for ensuring that Project can be
319+
* pushed as low as possible.
320+
*/
321+
object PushProjectThroughFilter extends Rule[LogicalPlan] {
322+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
323+
case p @ Project(projectList, f: Filter)
324+
if f.condition.deterministic && projectList.forall(_.deterministic) =>
325+
val required = f.references ++ p.references
326+
if ((f.inputSet -- required).nonEmpty) {
327+
p.copy(child = f.copy(child = ColumnPruning.prunedChild(f.child, required)))
328+
} else {
329+
p
330+
}
331+
}
332+
}
333+
334+
/**
335+
* Attempts to eliminate the reading of unneeded columns from the query plan
317336
*/
318337
object ColumnPruning extends Rule[LogicalPlan] {
319338
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
@@ -392,7 +411,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
392411
}
393412

394413
/** Applies a projection only when the child is producing unnecessary attributes */
395-
private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
414+
def prunedChild(c: LogicalPlan, allReferences: AttributeSet): LogicalPlan =
396415
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
397416
Project(c.output.filter(allReferences.contains), c)
398417
} else {
@@ -874,6 +893,10 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
874893
* that were defined in the projection.
875894
*
876895
* This heuristic is valid assuming the expression evaluation cost is minimal.
896+
*
897+
* Note: Because PushProjectThroughFilter could reverse the effect of PushPredicateThroughProject,
898+
* PushPredicateThroughProject needs to be called before the other Predicate Push Down rules for
899+
* ensuring the predicates can be pushed as low as possible.
877900
*/
878901
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
879902
def apply(plan: LogicalPlan): LogicalPlan = plan transform {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class ColumnPruningSuite extends PlanTest {
3434

3535
object Optimize extends RuleExecutor[LogicalPlan] {
3636
val batches = Batch("Column pruning", FixedPoint(100),
37+
PushPredicateThroughProject,
38+
PushPredicateThroughJoin,
39+
PushProjectThroughFilter,
3740
ColumnPruning,
3841
EliminateOperators,
3942
CollapseProject) :: Nil

0 commit comments

Comments
 (0)