Skip to content

Commit 23050c8

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-17897][SQL][BACKPORT-2.0] Fixed IsNotNull Constraint Inference Rule
### What changes were proposed in this pull request? This PR is to backport #16067 to Spark 2.0 ---- The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.) Below is the existing code we have for `IsNotNull` pushdown. ```Scala private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) case _: NullIntolerant | IsNotNull(_: NullIntolerant) => expr.children.flatMap(scanNullIntolerantExpr) case _ => Seq.empty[Attribute] } ``` **`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root. Without the fix, the following test case will return empty. ```Scala val data = Seq[java.lang.Integer](1, null).toDF("key") data.filter("not key is not null").show() ``` Before the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter (isnotnull(value#1) && NOT isnotnull(value#1)) +- LocalRelation [value#1] ``` After the fix, the optimized plan is like ``` == Optimized Logical Plan == Project [value#1 AS key#3] +- Filter NOT isnotnull(value#1) +- LocalRelation [value#1] ``` ### How was this patch tested? Added a test Author: Xiao Li <gatorsmile@gmail.com> Closes #16894 from gatorsmile/isNotNull2.0.
1 parent 00803cd commit 23050c8

3 files changed

Lines changed: 35 additions & 7 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
4040
}
4141

4242
/**
43-
* Infers a set of `isNotNull` constraints from a given set of equality/comparison expressions as
44-
* well as non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
43+
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
44+
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
4545
* returns a constraint of the form `isNotNull(a)`
4646
*/
4747
private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = {
4848
// First, we propagate constraints from the null intolerant expressions.
49-
var isNotNullConstraints: Set[Expression] =
50-
constraints.flatMap(scanNullIntolerantExpr).map(IsNotNull(_))
49+
var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints)
5150

5251
// Second, we infer additional constraints from non-nullable attributes that are part of the
5352
// operator's output
@@ -57,14 +56,28 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
5756
isNotNullConstraints -- constraints
5857
}
5958

59+
/**
60+
* Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions
61+
* of constraints.
62+
*/
63+
private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] =
64+
constraint match {
65+
// When the root is IsNotNull, we can push IsNotNull through the child null intolerant
66+
// expressions
67+
case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_))
68+
// Constraints always return true for all the inputs. That means, null will never be returned.
69+
// Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child
70+
// null intolerant expressions.
71+
case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
72+
}
73+
6074
/**
6175
* Recursively explores the expressions which are null intolerant and returns all attributes
6276
* in these expressions.
6377
*/
64-
private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
78+
private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match {
6579
case a: Attribute => Seq(a)
66-
case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
67-
expr.children.flatMap(scanNullIntolerantExpr)
80+
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
6881
case _ => Seq.empty[Attribute]
6982
}
7083

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,15 @@ class ConstraintPropagationSuite extends SparkFunSuite {
351351
IsNotNull(IsNotNull(resolveColumn(tr, "b"))),
352352
IsNotNull(resolveColumn(tr, "a")),
353353
IsNotNull(resolveColumn(tr, "c")))))
354+
355+
verifyConstraints(
356+
tr.where('a.attr === 1 && IsNotNull(resolveColumn(tr, "b")) &&
357+
IsNotNull(resolveColumn(tr, "c"))).analyze.constraints,
358+
ExpressionSet(Seq(
359+
resolveColumn(tr, "a") === 1,
360+
IsNotNull(resolveColumn(tr, "c")),
361+
IsNotNull(resolveColumn(tr, "a")),
362+
IsNotNull(resolveColumn(tr, "b")))))
354363
}
355364

356365
test("infer IsNotNull constraints from non-nullable attributes") {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
16491649
expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2"))
16501650
}
16511651

1652+
test("SPARK-17897: Fixed IsNotNull Constraint Inference Rule") {
1653+
val data = Seq[java.lang.Integer](1, null).toDF("key")
1654+
checkAnswer(data.filter(!$"key".isNotNull), Row(null))
1655+
checkAnswer(data.filter(!(- $"key").isNotNull), Row(null))
1656+
}
1657+
16521658
test("SPARK-17957: outer join + na.fill") {
16531659
val df1 = Seq((1, 2), (2, 3)).toDF("a", "b")
16541660
val df2 = Seq((2, 5), (3, 4)).toDF("a", "c")

0 commit comments

Comments
 (0)