From cc4323f36b3a38f193f049e8949c034adbf69c52 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 24 Feb 2016 23:49:17 -0800 Subject: [PATCH 1/7] NullFiltering rule in catalyst --- .../sql/catalyst/optimizer/Optimizer.scala | 47 ++ .../BooleanSimplificationSuite.scala | 5 + .../optimizer/NullFilteringSuite.scala | 531 ++++++++++++++++++ 3 files changed, 583 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2aeb9575f1dd..125660788467 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -76,6 +76,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction + NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, @@ -585,6 +586,52 @@ object NullPropagation extends Rule[LogicalPlan] { } } +/** + * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness + * by inserting isNotNull filters is the query plan. These filters are currently inserted beneath + * existing Filters and Join operators and are inferred based on their data constraints. + * + * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and + * LeftSemi joins. + */ +object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case filter @ Filter(condition, child: LogicalPlan) => + // We generate a list of additional isNotNull filters from the operator's existing constraints + // but remove those that are either already part of the filter condition or are part of the + // operator's child constraints. + val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- + (child.constraints ++ splitConjunctivePredicates(condition)) + val newCondition = if (newIsNotNullConstraints.nonEmpty) { + And(newIsNotNullConstraints.reduce(And), condition) + } else { + condition + } + Filter(newCondition, child) + + case join @ Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, + condition: Option[Expression]) => + val leftIsNotNullConstraints = join.constraints + .filter(_.isInstanceOf[IsNotNull]) + .filter(_.references.subsetOf(left.outputSet)) -- left.constraints + val rightIsNotNullConstraints = + join.constraints + .filter(_.isInstanceOf[IsNotNull]) + .filter(_.references.subsetOf(right.outputSet)) -- right.constraints + val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { + Filter(leftIsNotNullConstraints.reduce(And), left) + } else { + left + } + val newRightChild = if (rightIsNotNullConstraints.nonEmpty) { + Filter(rightIsNotNullConstraints.reduce(And), right) + } else { + right + } + Join(newLeftChild, newRightChild, joinType, condition) + } +} + /** * Replaces [[Expression Expressions]] that can be statically evaluated with * equivalent [[Literal]] values. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 3e52441519ae..d4641444eaba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -129,4 +129,9 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { testRelation.where('a > 2 || ('b > 3 && 'b < 5))) comparePlans(actual, expected) } + + test("evaluate isNotNull expressions before others") { + checkCondition(input = 'a > 0 && IsNotNull('b) && 'c < 3, + expected = IsNotNull('b) && 'a > 0 && 'c < 3) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala new file mode 100644 index 000000000000..a8c02f2e49e5 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.types.IntegerType + +class NullFilteringSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("NullFiltering", FixedPoint(50), NullFiltering) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + val testRelation1 = LocalRelation('d.int) + + test("filter: filter out nulls") { + val originalQuery = + testRelation + .where('a === 1 && 'b > 2) + + val correctAnswer = + testRelation + .where(IsNotNull('a) && IsNotNull('b) && 'a === 1 && 'b > 2) + .analyze + + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, correctAnswer) + } + + test("inner join: filter out nulls on either side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y) + .where("x.b".attr === "y.b".attr) + } + + val left = testRelation.where(IsNotNull('b)) + val right = testRelation.where(IsNotNull('b)) + val correctAnswer = left.join(right, joinType = Inner, + condition = Some("x.b".attr === "y.b".attr)).analyze + + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, correctAnswer) + } + + test("joins: push to one side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y) + .where("x.b".attr === 1) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 1) + val right = testRelation + val correctAnswer = + left.join(right).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push to one side after transformCondition") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = { + x.join(y) + .where(("x.a".attr === 1 && "y.d".attr === "x.b".attr) || + ("x.a".attr === 1 && "y.d".attr === "x.c".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a === 1) + val right = testRelation1 + val correctAnswer = + left.join(right, condition = Some("d".attr === "b".attr || "d".attr === "c".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: rewrite filter to push to either side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 1) + val right = testRelation.where('b === 2) + val correctAnswer = + left.join(right).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left semi join") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = { + x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 1) + val correctAnswer = + left.join(y, LeftOuter).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter).where("x.b".attr === 1).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("x.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('d) + val correctAnswer = + left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter, Some("l.a".attr===3)). + where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a === 3).subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: can't push down") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, condition = Some("x.b".attr === "y.b".attr)) + } + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized) + } + + test("joins: conjunctive predicates") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y) + .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && ("y.a".attr === 1)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a === 1).subquery('x) + val right = testRelation.where('a === 1).subquery('y) + val correctAnswer = + left.join(right, condition = Some("x.b".attr === "y.b".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + } + + test("joins: conjunctive predicates #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y) + .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a === 1).subquery('x) + val right = testRelation.subquery('y) + val correctAnswer = + left.join(right, condition = Some("x.b".attr === "y.b".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + } + + test("joins: conjunctive predicates #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + z.join(x.join(y)) + .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && + ("z.a".attr >= 3) && ("z.a".attr === "x.b".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val lleft = testRelation.where('a >= 3).subquery('z) + val left = testRelation.where('a === 1).subquery('x) + val right = testRelation.subquery('y) + val correctAnswer = + lleft.join( + left.join(right, condition = Some("x.b".attr === "y.b".attr)), + condition = Some("z.a".attr === "x.b".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + } + + val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) + + test("generate: predicate referenced no generated column") { + val originalQuery = { + testRelationWithArrayType + .generate(Explode('c_arr), true, false, Some("arr")) + .where(('b >= 5) && ('a > 6)) + } + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = { + testRelationWithArrayType + .where(('b >= 5) && ('a > 6)) + .generate(Explode('c_arr), true, false, Some("arr")).analyze + } + + comparePlans(optimized, correctAnswer) + } + + test("generate: part of conjuncts referenced generated column") { + val generator = Explode('c_arr) + val originalQuery = { + testRelationWithArrayType + .generate(generator, true, false, Some("arr")) + .where(('b >= 5) && ('c > 6)) + } + val optimized = Optimize.execute(originalQuery.analyze) + val referenceResult = { + testRelationWithArrayType + .where('b >= 5) + .generate(generator, true, false, Some("arr")) + .where('c > 6).analyze + } + + // Since newly generated columns get different ids every time being analyzed + // e.g. comparePlans(originalQuery.analyze, originalQuery.analyze) fails. + // So we check operators manually here. + // Filter("c" > 6) + assertResult(classOf[Filter])(optimized.getClass) + assertResult(1)(optimized.asInstanceOf[Filter].condition.references.size) + assertResult("c"){ + optimized.asInstanceOf[Filter].condition.references.toSeq(0).name + } + + // the rest part + comparePlans(optimized.children(0), referenceResult.children(0)) + } + + test("generate: all conjuncts referenced generated column") { + val originalQuery = { + testRelationWithArrayType + .generate(Explode('c_arr), true, false, Some("arr")) + .where(('c > 6) || ('b > 5)).analyze + } + val optimized = Optimize.execute(originalQuery) + + comparePlans(optimized, originalQuery) + } + + test("push project and filter down into sample") { + val x = testRelation.subquery('x) + val originalQuery = + Sample(0.0, 0.6, false, 11L, x)().select('a) + + val originalQueryAnalyzed = + EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) + + val optimized = Optimize.execute(originalQueryAnalyzed) + + val correctAnswer = + Sample(0.0, 0.6, false, 11L, x.select('a))() + + comparePlans(optimized, correctAnswer.analyze) + } + + test("aggregate: push down filter when filter on group by expression") { + val originalQuery = testRelation + .groupBy('a)('a, count('b) as 'c) + .select('a, 'c) + .where('a === 2) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .where('a === 2) + .groupBy('a)('a, count('b) as 'c) + .analyze + comparePlans(optimized, correctAnswer) + } + + test("aggregate: don't push down filter when filter not on group by expression") { + val originalQuery = testRelation + .select('a, 'b) + .groupBy('a)('a, count('b) as 'c) + .where('c === 2L) + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, originalQuery.analyze) + } + + test("aggregate: push down filters partially which are subset of group by expressions") { + val originalQuery = testRelation + .select('a, 'b) + .groupBy('a)('a, count('b) as 'c) + .where('c === 2L && 'a === 3) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .select('a, 'b) + .where('a === 3) + .groupBy('a)('a, count('b) as 'c) + .where('c === 2L) + .analyze + + comparePlans(optimized, correctAnswer) + } + + test("aggregate: push down filters with alias") { + val originalQuery = testRelation + .select('a, 'b) + .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) + .where(('c === 2L || 'aa > 4) && 'aa < 3) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .select('a, 'b) + .where('a + 1 < 3) + .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) + .where('c === 2L || 'aa > 4) + .analyze + + comparePlans(optimized, correctAnswer) + } +} From 28050b3a38607b27846cc1aa879eca82d1f21644 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 2 Mar 2016 13:07:27 -0800 Subject: [PATCH 2/7] unit tests --- .../BooleanSimplificationSuite.scala | 5 - .../optimizer/NullFilteringSuite.scala | 501 +----------------- .../spark/sql/catalyst/plans/PlanTest.scala | 18 +- 3 files changed, 34 insertions(+), 490 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index d4641444eaba..3e52441519ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -129,9 +129,4 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { testRelation.where('a > 2 || ('b > 3 && 'b < 5))) comparePlans(actual, expected) } - - test("evaluate isNotNull expressions before others") { - checkCondition(input = 'a > 0 && IsNotNull('b) && 'c < 3, - expected = IsNotNull('b) && 'a > 0 && 'c < 3) - } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala index a8c02f2e49e5..5f936f99c831 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala @@ -17,515 +17,52 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.IntegerType class NullFilteringSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("NullFiltering", FixedPoint(50), NullFiltering) :: Nil + val batches = Batch("NullFiltering", Once, NullFiltering) :: + Batch("CombineFilters", Once, CombineFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - val testRelation1 = LocalRelation('d.int) - - test("filter: filter out nulls") { - val originalQuery = - testRelation - .where('a === 1 && 'b > 2) - - val correctAnswer = - testRelation - .where(IsNotNull('a) && IsNotNull('b) && 'a === 1 && 'b > 2) - .analyze - + test("filter: filter out nulls in condition") { + val originalQuery = testRelation.where('a === 1) + val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze val optimized = Optimize.execute(originalQuery.analyze) comparePlans(optimized, correctAnswer) } - test("inner join: filter out nulls on either side") { + test("join: filter out nulls on either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y) - .where("x.b".attr === "y.b".attr) - } - - val left = testRelation.where(IsNotNull('b)) - val right = testRelation.where(IsNotNull('b)) - val correctAnswer = left.join(right, joinType = Inner, - condition = Some("x.b".attr === "y.b".attr)).analyze - - val optimized = Optimize.execute(originalQuery.analyze) - comparePlans(optimized, correctAnswer) - } - - test("joins: push to one side") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y) - .where("x.b".attr === 1) - } - + val originalQuery = x.join(y, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)) + val left = x.where(IsNotNull('a) && IsNotNull('b)) + val right = y.where(IsNotNull('a) && IsNotNull('c)) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 1) - val right = testRelation - val correctAnswer = - left.join(right).analyze - comparePlans(optimized, correctAnswer) } - test("joins: push to one side after transformCondition") { - val x = testRelation.subquery('x) - val y = testRelation1.subquery('y) - - val originalQuery = { - x.join(y) - .where(("x.a".attr === 1 && "y.d".attr === "x.b".attr) || - ("x.a".attr === 1 && "y.d".attr === "x.c".attr)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('a === 1) - val right = testRelation1 - val correctAnswer = - left.join(right, condition = Some("d".attr === "b".attr || "d".attr === "c".attr)).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: rewrite filter to push to either side") { + test("join with pre-existing filters: filter out nulls on either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y) - .where("x.b".attr === 1 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 1) - val right = testRelation.where('b === 2) - val correctAnswer = - left.join(right).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left semi join") { - val x = testRelation.subquery('x) - val y = testRelation1.subquery('y) - - val originalQuery = { - x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b >= 1) - val right = testRelation1.where('d >= 2) - val correctAnswer = - left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left outer join #1") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, LeftOuter) - .where("x.b".attr === 1 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 1) - val correctAnswer = - left.join(y, LeftOuter).where("y.b".attr === 2).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down right outer join #1") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, RightOuter) - .where("x.b".attr === 1 && "y.b".attr === 2) - } - + val originalQuery = x.where('b > 5).join(y.where('c === 10), + condition = Some("x.a".attr === "y.a".attr)) + val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5) + val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr)).analyze val optimized = Optimize.execute(originalQuery.analyze) - val right = testRelation.where('b === 2).subquery('d) - val correctAnswer = - x.join(right, RightOuter).where("x.b".attr === 1).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left outer join #2") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, LeftOuter, Some("x.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 2).subquery('d) - val correctAnswer = - left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down right outer join #2") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, RightOuter, Some("y.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val right = testRelation.where('b === 2).subquery('d) - val correctAnswer = - x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left outer join #3") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, LeftOuter, Some("y.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 2).subquery('l) - val right = testRelation.where('b === 1).subquery('r) - val correctAnswer = - left.join(right, LeftOuter).where("r.b".attr === 2).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down right outer join #3") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, RightOuter, Some("y.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val right = testRelation.where('b === 2).subquery('r) - val correctAnswer = - x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left outer join #4") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, LeftOuter, Some("y.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 2).subquery('l) - val right = testRelation.where('b === 1).subquery('r) - val correctAnswer = - left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down right outer join #4") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, RightOuter, Some("y.b".attr === 1)) - .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.subquery('l) - val right = testRelation.where('b === 2).subquery('r) - val correctAnswer = - left.join(right, RightOuter, Some("r.b".attr === 1)). - where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down left outer join #5") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) - .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b === 2).subquery('l) - val right = testRelation.where('b === 1).subquery('r) - val correctAnswer = - left.join(right, LeftOuter, Some("l.a".attr===3)). - where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: push down right outer join #5") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) - .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('a === 3).subquery('l) - val right = testRelation.where('b === 2).subquery('r) - val correctAnswer = - left.join(right, RightOuter, Some("r.b".attr === 1)). - where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze - - comparePlans(optimized, correctAnswer) - } - - test("joins: can't push down") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y, condition = Some("x.b".attr === "y.b".attr)) - } - val optimized = Optimize.execute(originalQuery.analyze) - - comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized) - } - - test("joins: conjunctive predicates") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y) - .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && ("y.a".attr === 1)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('a === 1).subquery('x) - val right = testRelation.where('a === 1).subquery('y) - val correctAnswer = - left.join(right, condition = Some("x.b".attr === "y.b".attr)) - .analyze - - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) - } - - test("joins: conjunctive predicates #2") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - - val originalQuery = { - x.join(y) - .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('a === 1).subquery('x) - val right = testRelation.subquery('y) - val correctAnswer = - left.join(right, condition = Some("x.b".attr === "y.b".attr)) - .analyze - - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) - } - - test("joins: conjunctive predicates #3") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val z = testRelation.subquery('z) - - val originalQuery = { - z.join(x.join(y)) - .where(("x.b".attr === "y.b".attr) && ("x.a".attr === 1) && - ("z.a".attr >= 3) && ("z.a".attr === "x.b".attr)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val lleft = testRelation.where('a >= 3).subquery('z) - val left = testRelation.where('a === 1).subquery('x) - val right = testRelation.subquery('y) - val correctAnswer = - lleft.join( - left.join(right, condition = Some("x.b".attr === "y.b".attr)), - condition = Some("z.a".attr === "x.b".attr)) - .analyze - - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) - } - - val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) - - test("generate: predicate referenced no generated column") { - val originalQuery = { - testRelationWithArrayType - .generate(Explode('c_arr), true, false, Some("arr")) - .where(('b >= 5) && ('a > 6)) - } - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = { - testRelationWithArrayType - .where(('b >= 5) && ('a > 6)) - .generate(Explode('c_arr), true, false, Some("arr")).analyze - } - - comparePlans(optimized, correctAnswer) - } - - test("generate: part of conjuncts referenced generated column") { - val generator = Explode('c_arr) - val originalQuery = { - testRelationWithArrayType - .generate(generator, true, false, Some("arr")) - .where(('b >= 5) && ('c > 6)) - } - val optimized = Optimize.execute(originalQuery.analyze) - val referenceResult = { - testRelationWithArrayType - .where('b >= 5) - .generate(generator, true, false, Some("arr")) - .where('c > 6).analyze - } - - // Since newly generated columns get different ids every time being analyzed - // e.g. comparePlans(originalQuery.analyze, originalQuery.analyze) fails. - // So we check operators manually here. - // Filter("c" > 6) - assertResult(classOf[Filter])(optimized.getClass) - assertResult(1)(optimized.asInstanceOf[Filter].condition.references.size) - assertResult("c"){ - optimized.asInstanceOf[Filter].condition.references.toSeq(0).name - } - - // the rest part - comparePlans(optimized.children(0), referenceResult.children(0)) - } - - test("generate: all conjuncts referenced generated column") { - val originalQuery = { - testRelationWithArrayType - .generate(Explode('c_arr), true, false, Some("arr")) - .where(('c > 6) || ('b > 5)).analyze - } - val optimized = Optimize.execute(originalQuery) - - comparePlans(optimized, originalQuery) - } - - test("push project and filter down into sample") { - val x = testRelation.subquery('x) - val originalQuery = - Sample(0.0, 0.6, false, 11L, x)().select('a) - - val originalQueryAnalyzed = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) - - val optimized = Optimize.execute(originalQueryAnalyzed) - - val correctAnswer = - Sample(0.0, 0.6, false, 11L, x.select('a))() - - comparePlans(optimized, correctAnswer.analyze) - } - - test("aggregate: push down filter when filter on group by expression") { - val originalQuery = testRelation - .groupBy('a)('a, count('b) as 'c) - .select('a, 'c) - .where('a === 2) - - val optimized = Optimize.execute(originalQuery.analyze) - - val correctAnswer = testRelation - .where('a === 2) - .groupBy('a)('a, count('b) as 'c) - .analyze - comparePlans(optimized, correctAnswer) - } - - test("aggregate: don't push down filter when filter not on group by expression") { - val originalQuery = testRelation - .select('a, 'b) - .groupBy('a)('a, count('b) as 'c) - .where('c === 2L) - - val optimized = Optimize.execute(originalQuery.analyze) - - comparePlans(optimized, originalQuery.analyze) - } - - test("aggregate: push down filters partially which are subset of group by expressions") { - val originalQuery = testRelation - .select('a, 'b) - .groupBy('a)('a, count('b) as 'c) - .where('c === 2L && 'a === 3) - - val optimized = Optimize.execute(originalQuery.analyze) - - val correctAnswer = testRelation - .select('a, 'b) - .where('a === 3) - .groupBy('a)('a, count('b) as 'c) - .where('c === 2L) - .analyze - - comparePlans(optimized, correctAnswer) - } - - test("aggregate: push down filters with alias") { - val originalQuery = testRelation - .select('a, 'b) - .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) - .where(('c === 2L || 'aa > 4) && 'aa < 3) - - val optimized = Optimize.execute(originalQuery.analyze) - - val correctAnswer = testRelation - .select('a, 'b) - .where('a + 1 < 3) - .groupBy('a)(('a + 1) as 'aa, count('b) as 'c) - .where('c === 2L || 'aa > 4) - .analyze - comparePlans(optimized, correctAnswer) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index f9874088b588..0541844e0bfc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util._ /** * Provides helper methods for comparing plans. */ -abstract class PlanTest extends SparkFunSuite { +abstract class PlanTest extends SparkFunSuite with PredicateHelper { /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical. @@ -39,10 +39,22 @@ abstract class PlanTest extends SparkFunSuite { } } + /** + * Normalizes the filter conditions that appear in the plan. For instance, + * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2) + * etc., will all now be equivalent. + */ + private def normalizeFilters(plan: LogicalPlan) = { + plan transform { + case filter @ Filter(condition: Expression, child: LogicalPlan) => + Filter(splitConjunctivePredicates(condition).sortBy(_.hashCode()).reduce(And), child) + } + } + /** Fails the test if the two plans do not match */ protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { - val normalized1 = normalizeExprIds(plan1) - val normalized2 = normalizeExprIds(plan2) + val normalized1 = normalizeFilters(normalizeExprIds(plan1)) + val normalized2 = normalizeFilters(normalizeExprIds(plan2)) if (normalized1 != normalized2) { fail( s""" From 0b1520c6ec38e2c505335e70e248abc27efccd6f Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 2 Mar 2016 19:12:59 -0800 Subject: [PATCH 3/7] Fix OrcFilterSuite --- .../spark/sql/hive/orc/OrcFilterSuite.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index c94e73c4aa30..6ca334dc6d5f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest { (predicate: Predicate, filterOperator: PredicateLeaf.Operator) (implicit df: DataFrame): Unit = { def checkComparisonOperator(filter: SearchArgument) = { - val operator = filter.getLeaves.asScala.head.getOperator - assert(operator === filterOperator) + val operator = filter.getLeaves.asScala + assert(operator.map(_.getOperator).contains(filterOperator)) } checkFilterPredicate(df, predicate, checkComparisonOperator) } @@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest { ) checkFilterPredicate( !('_1 < 4), - """leaf-0 = (LESS_THAN _1 4) - |expr = (not leaf-0)""".stripMargin.trim + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 4) + |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim ) checkFilterPredicate( '_1 < 2 || '_1 > 3, @@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest { ) checkFilterPredicate( '_1 < 2 && '_1 > 3, - """leaf-0 = (LESS_THAN _1 2) - |leaf-1 = (LESS_THAN_EQUALS _1 3) - |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 2) + |leaf-2 = (LESS_THAN_EQUALS _1 3) + |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim ) } } From 2a469e84a618ffe69580eaab6ca77e2a3ab1b7f7 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 2 Mar 2016 19:37:29 -0800 Subject: [PATCH 4/7] Fix SimpleTextHadoopFsRelationSuite --- .../spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 9ab3e11609ce..e64bb77a03a5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat } markup("Checking pushed filters") - assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet) + assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters)) val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet markup("Checking unhandled and inconvertible filters") - assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters) + assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters)) markup("Checking partitioning filters") val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter { From 80dab7e8cda525f7a3a375b3b77de0c297008ffb Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 3 Mar 2016 11:43:02 -0800 Subject: [PATCH 5/7] Add isNotNull handling in SimpleTextRelation --- .../spark/sql/sources/SimpleTextRelation.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9fc437bf8815..964ccb5eec51 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -140,12 +140,17 @@ class SimpleTextRelation( // Constructs a filter predicate to simulate filter push-down val predicate = { val filterCondition: Expression = filters.collect { - // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter + // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and + // `isNotNull` filters case sources.GreaterThan(column, value) => val dataType = dataSchema(column).dataType val literal = Literal.create(value, dataType) val attribute = inputAttributes.find(_.name == column).get expressions.GreaterThan(attribute, literal) + case sources.IsNotNull(column) => + val dataType = dataSchema(column).dataType + val attribute = inputAttributes.find(_.name == column).get + expressions.IsNotNull(attribute) }.reduceOption(expressions.And).getOrElse(Literal(true)) InterpretedPredicate.create(filterCondition, inputAttributes) } @@ -183,11 +188,12 @@ class SimpleTextRelation( } } - // `SimpleTextRelation` only handles `GreaterThan` filter. This is used to test filter push-down - // and `BaseRelation.unhandledFilters()`. + // `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters. This is used to test + // filter push-down and `BaseRelation.unhandledFilters()`. override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { filters.filter { case _: GreaterThan => false + case _: IsNotNull => false case _ => true } } From 013f97a3af010974dcd14198ee5ce073ab73c9ce Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 3 Mar 2016 15:26:07 -0800 Subject: [PATCH 6/7] Fix PlannerSuite and ParquetFilterSuite --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- .../execution/datasources/parquet/ParquetFilterSuite.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f66e08e6ca5c..a733237a5e71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -159,7 +159,7 @@ class PlannerSuite extends SharedSQLContext { withTempTable("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan - assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]")) + assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index fbffe867e4b7..7a3216dc7c3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -74,10 +74,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex selectedFilters.foreach { pred => val maybeFilter = ParquetFilters.createFilter(df.schema, pred) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.foreach { f => - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(f.getClass === filterClass) - } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + maybeFilter.exists(_.getClass === filterClass) } checker(stripSparkFilter(query), expected) } From 31b17007d8c6b2883d8f8b2f3bd4c0576fe0ed00 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 4 Mar 2016 15:29:38 -0800 Subject: [PATCH 7/7] Nong's comments --- .../sql/catalyst/optimizer/Optimizer.scala | 22 ++++----- .../optimizer/NullFilteringSuite.scala | 45 +++++++++++++++---- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 125660788467..b5f8ab29c9c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -588,7 +588,7 @@ object NullPropagation extends Rule[LogicalPlan] { /** * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness - * by inserting isNotNull filters is the query plan. These filters are currently inserted beneath + * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath * existing Filters and Join operators and are inferred based on their data constraints. * * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and @@ -596,22 +596,20 @@ object NullPropagation extends Rule[LogicalPlan] { */ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case filter @ Filter(condition, child: LogicalPlan) => + case filter @ Filter(condition, child) => // We generate a list of additional isNotNull filters from the operator's existing constraints // but remove those that are either already part of the filter condition or are part of the // operator's child constraints. val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- (child.constraints ++ splitConjunctivePredicates(condition)) - val newCondition = if (newIsNotNullConstraints.nonEmpty) { - And(newIsNotNullConstraints.reduce(And), condition) + if (newIsNotNullConstraints.nonEmpty) { + Filter(And(newIsNotNullConstraints.reduce(And), condition), child) } else { - condition + filter } - Filter(newCondition, child) - case join @ Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, - condition: Option[Expression]) => - val leftIsNotNullConstraints = join.constraints + case join @ Join(left, right, joinType, condition) => + val leftIsNotNullConstraints = join.constraints .filter(_.isInstanceOf[IsNotNull]) .filter(_.references.subsetOf(left.outputSet)) -- left.constraints val rightIsNotNullConstraints = @@ -628,7 +626,11 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { } else { right } - Join(newLeftChild, newRightChild, joinType, condition) + if (newLeftChild != left || newRightChild != right) { + Join(newLeftChild, newRightChild, joinType, condition) + } else { + join + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala index 5f936f99c831..7e52d5ef6749 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala @@ -28,41 +28,68 @@ class NullFilteringSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("NullFiltering", Once, NullFiltering) :: - Batch("CombineFilters", Once, CombineFilters) :: Nil + Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) test("filter: filter out nulls in condition") { - val originalQuery = testRelation.where('a === 1) + val originalQuery = testRelation.where('a === 1).analyze val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze - val optimized = Optimize.execute(originalQuery.analyze) + val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } - test("join: filter out nulls on either side") { + test("single inner join: filter out nulls on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val originalQuery = x.join(y, - condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)) + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze val left = x.where(IsNotNull('a) && IsNotNull('b)) val right = y.where(IsNotNull('a) && IsNotNull('c)) val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze - val optimized = Optimize.execute(originalQuery.analyze) + val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } - test("join with pre-existing filters: filter out nulls on either side") { + test("single inner join with pre-existing filters: filter out nulls on either side") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val originalQuery = x.where('b > 5).join(y.where('c === 10), - condition = Some("x.a".attr === "y.a".attr)) + condition = Some("x.a".attr === "y.a".attr)).analyze val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5) val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10) val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze - val optimized = Optimize.execute(originalQuery.analyze) + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single outer join: no null filters are generated") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, FullOuter, + condition = Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, originalQuery) + } + + test("multiple inner joins: filter out nulls on all sides on equi-join keys") { + val t1 = testRelation.subquery('t1) + val t2 = testRelation.subquery('t2) + val t3 = testRelation.subquery('t3) + val t4 = testRelation.subquery('t4) + + val originalQuery = t1 + .join(t2, condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3, condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze + val correctAnswer = t1.where(IsNotNull('b)) + .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze + val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } }