-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18582][SQL] Whitelist LogicalPlan operators allowed in correlated subqueries #16046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
b988651
069ed8f
edca333
64184fd
29f82b0
ac43ab4
631d396
7eb9b2d
1387cf5
6d9bade
9a1f80b
3fe9429
0757b81
35b77f0
c63b8c6
f3351d5
9fc5c33
402e1d9
b117281
3023399
4b692f0
0d64512
c8aadb5
3f184ea
23e357c
d60f0de
2181647
599f54b
ca9e1a8
3f4c62a
c8588de
05fd7a3
1d32958
0c9d0b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1011,24 +1011,24 @@ class Analyzer( | |
| private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = { | ||
| val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]] | ||
|
|
||
| /** Make sure a plans' subtree does not contain a tagged predicate. */ | ||
| def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = { | ||
| // Make sure a plan's subtree does not contain outer references | ||
| def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { | ||
| if (p.collect(predicateMap).nonEmpty) { | ||
| failAnalysis(s"Accessing outer query column is not allowed in $msg: $p") | ||
| failAnalysis(s"Accessing outer query column is not allowed in:\n$p") | ||
| } | ||
| } | ||
|
|
||
| /** Helper function for locating outer references. */ | ||
| // Helper function for locating outer references. | ||
| def containsOuter(e: Expression): Boolean = { | ||
| e.find(_.isInstanceOf[OuterReference]).isDefined | ||
| } | ||
|
|
||
| /** Make sure a plans' expressions do not contain a tagged predicate. */ | ||
| // Make sure a plan's expressions do not contain outer references | ||
| def failOnOuterReference(p: LogicalPlan): Unit = { | ||
| if (p.expressions.exists(containsOuter)) { | ||
| failAnalysis( | ||
| "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + | ||
| s"clauses: $p") | ||
| s"clauses:\n$p") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1077,10 +1077,54 @@ class Analyzer( | |
|
|
||
| // Simplify the predicates before pulling them out. | ||
| val transformed = BooleanSimplification(sub) transformUp { | ||
| // WARNING: | ||
| // Only Filter can host correlated expressions at this time | ||
| // Anyone adding a new "case" below needs to add the call to | ||
| // "failOnOuterReference" to disallow correlated expressions in it. | ||
|
|
||
| // Whitelist operators allowed in a correlated subquery | ||
| // There are 4 categories: | ||
| // 1. Operators that are allowed anywhere in a correlated subquery, and, | ||
| // by definition of the operators, they cannot host outer references. | ||
| // 2. Operators that are allowed anywhere in a correlated subquery | ||
| // so long as they do not host outer references. | ||
| // 3. Operators that need special handlings. These operators are | ||
| // Project, Filter, Join, Aggregate, and Generate. | ||
| // | ||
| // Any operators that are not in the above list are allowed | ||
| // in a correlated subquery only if they are not on a correlation path. | ||
| // In other word, these operators are allowed only under a correlation point. | ||
| // | ||
| // A correlation path is defined as the sub-tree of all the operators that | ||
| // are on the path from the operator hosting the correlated expressions | ||
| // up to the operator producing the correlated values. | ||
|
|
||
| // Category 1: | ||
| // Leaf node can be anywhere in a correlated subquery. | ||
|
||
| case n: LeafNode => | ||
| n | ||
| // Category 2: | ||
| // These operators can be anywhere in a correlated subquery. | ||
| // so long as they do not host outer references in the operators. | ||
| // SubqueryAlias can be anywhere in a correlated subquery. | ||
| case p: SubqueryAlias => | ||
|
||
| failOnOuterReference(p) | ||
| p | ||
| case p: Distinct => | ||
| failOnOuterReference(p) | ||
| p | ||
| case p: Sort => | ||
| failOnOuterReference(p) | ||
| p | ||
| case p: Repartition => | ||
| failOnOuterReference(p) | ||
| p | ||
| case p: RedistributeData => | ||
| failOnOuterReference(p) | ||
| p | ||
| case p: BroadcastHint => | ||
| failOnOuterReference(p) | ||
| p | ||
|
|
||
| // Category 3: | ||
| // Filter is one of the two operators allowed to host correlated expressions. | ||
| // The other operator is Join. Filter can be anywhere in a correlated subquery. | ||
| case f @ Filter(cond, child) => | ||
| // Find all predicates with an outer reference. | ||
| val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter) | ||
|
|
@@ -1102,6 +1146,9 @@ class Analyzer( | |
| predicateMap += child -> xs | ||
| child | ||
| } | ||
|
|
||
| // Project cannot host any correlated expressions | ||
| // but can be anywhere in a correlated subquery. | ||
| case p @ Project(expressions, child) => | ||
| failOnOuterReference(p) | ||
| val referencesToAdd = missingReferences(p) | ||
|
|
@@ -1110,6 +1157,12 @@ class Analyzer( | |
| } else { | ||
| p | ||
| } | ||
|
|
||
| // Aggregate cannot host any correlated expressions | ||
| // It can be on a correlation path if the correlation has | ||
| // only equality correlated predicates. | ||
| // It cannot be on a correlation path if the correlation has | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: has -> contains?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will change. |
||
| // non-equality correlated predicates. | ||
| case a @ Aggregate(grouping, expressions, child) => | ||
| failOnOuterReference(a) | ||
| failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) | ||
|
|
@@ -1120,47 +1173,54 @@ class Analyzer( | |
| } else { | ||
| a | ||
| } | ||
| case w : Window => | ||
| failOnOuterReference(w) | ||
| failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w) | ||
| w | ||
| case j @ Join(left, _, RightOuter, _) => | ||
| failOnOuterReference(j) | ||
| failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN") | ||
| j | ||
| // SPARK-18578: Do not allow any correlated predicate | ||
| // in a Full (Outer) Join operator and its descendants | ||
| case j @ Join(_, _, FullOuter, _) => | ||
| failOnOuterReferenceInSubTree(j, "a FULL OUTER JOIN") | ||
| j | ||
| case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] => | ||
| failOnOuterReference(j) | ||
| failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN") | ||
|
|
||
| // Join can host correlated expressions. | ||
| case j @ Join(left, right, joinType, _) => | ||
| joinType match { | ||
| // Inner join, like Filter, can be anywhere. | ||
| // LeftSemi is a special case of Inner join which returns | ||
| // only the first matched row to the right table. | ||
| case _: InnerLike | LeftSemi => | ||
|
||
| failOnOuterReference(j) | ||
|
|
||
| // Left outer join's right operand cannot be on a correlation path. | ||
| // LeftAnti and ExistenceJoin are special cases of LeftOuter. | ||
| // Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame | ||
| // so it should not show up here in Analysis phase. This is just a safety net. | ||
| case LeftOuter | LeftAnti | ExistenceJoin(_) => | ||
| failOnOuterReference(j) | ||
| failOnOuterReferenceInSubTree(right) | ||
|
|
||
| // Likewise, Right outer join's left operand cannot be on a correlation path. | ||
| case RightOuter => | ||
| failOnOuterReference(j) | ||
| failOnOuterReferenceInSubTree(left) | ||
|
|
||
| // Any other join types not explicitly listed above, | ||
| // including Full outer join, are treated as Category 4. | ||
| case _ => | ||
| failOnOuterReferenceInSubTree(j) | ||
| } | ||
| j | ||
| case u: Union => | ||
| failOnOuterReferenceInSubTree(u, "a UNION") | ||
| u | ||
| case s: SetOperation => | ||
| failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT") | ||
| s | ||
| case e: Expand => | ||
| failOnOuterReferenceInSubTree(e, "an EXPAND") | ||
| e | ||
| case l : LocalLimit => | ||
| failOnOuterReferenceInSubTree(l, "a LIMIT") | ||
| l | ||
| // Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit (<n>, child)) | ||
| // and we are walking bottom up, we will fail on LocalLimit before | ||
| // reaching GlobalLimit. | ||
| // The code below is just a safety net. | ||
| case g : GlobalLimit => | ||
| failOnOuterReferenceInSubTree(g, "a LIMIT") | ||
| g | ||
| case s : Sample => | ||
| failOnOuterReferenceInSubTree(s, "a TABLESAMPLE") | ||
| s | ||
|
|
||
| // Generator with join=true, i.e., expressed with | ||
| // LATERAL VIEW [OUTER], similar to inner join, | ||
| // allows to have correlation under it | ||
| // but must not host any outer references. | ||
| // Note: | ||
| // Generator with join=false is treated as Category 4. | ||
| case p @ Generate(generator, join, _, _, _, _) if (join) => | ||
|
||
| if (containsOuter(generator)) { | ||
|
||
| failOnOuterReference(p) | ||
| } | ||
| p | ||
|
|
||
| // Category 4: Any other operators not in the above 3 categories | ||
| // cannot be on a correlation path, that is they are allowed only | ||
| // under a correlation point but they and their descendant operators | ||
| // are not allowed to have any correlated expressions. | ||
| case p => | ||
| failOnOuterReference(p) | ||
| failOnOuterReferenceInSubTree(p) | ||
| p | ||
| } | ||
| (transformed, predicateMap.values.flatten.toSeq) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets change this line into
p.collectFirst(predicateMap).nonEmptythat is a little more efficient.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I will make the change in the next PR.