Skip to content

Commit 0d589ba

Browse files
committed
[SPARK-20857][SQL] Generic resolved hint node
## What changes were proposed in this pull request? This patch renames BroadcastHint to ResolvedHint (and Hint to UnresolvedHint) so the hint framework is more generic and would allow us to introduce other hint types in the future without introducing new hint nodes. ## How was this patch tested? Updated test cases. Author: Reynold Xin <rxin@databricks.com> Closes #18072 from rxin/SPARK-20857.
1 parent ad09e4c commit 0d589ba

20 files changed

Lines changed: 118 additions & 80 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1336,7 +1336,7 @@ class Analyzer(
13361336

13371337
// Category 1:
13381338
// BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
1339-
case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
1339+
case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias =>
13401340

13411341
// Category 2:
13421342
// These operators can be anywhere in a correlated subquery.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper {
399399
|in operator ${operator.simpleString}
400400
""".stripMargin)
401401

402-
case _: Hint =>
402+
case _: UnresolvedHint =>
403403
throw new IllegalStateException(
404404
"Internal error: logical hint operator should have been removed during analysis")
405405

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ object ResolveHints {
5757
val newNode = CurrentOrigin.withOrigin(plan.origin) {
5858
plan match {
5959
case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) =>
60-
BroadcastHint(plan)
60+
ResolvedHint(plan, isBroadcastable = Option(true))
6161
case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
62-
BroadcastHint(plan)
62+
ResolvedHint(plan, isBroadcastable = Option(true))
6363

64-
case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
64+
case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
6565
// Don't traverse down these nodes.
6666
// For an existing broadcast hint, there is no point going down (if we do, we either
6767
// won't change the structure, or will introduce another broadcast hint that is useless.
@@ -85,10 +85,10 @@ object ResolveHints {
8585
}
8686

8787
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
88-
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
88+
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
8989
if (h.parameters.isEmpty) {
9090
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
91-
BroadcastHint(h.child)
91+
ResolvedHint(h.child, isBroadcastable = Option(true))
9292
} else {
9393
// Otherwise, find within the subtree query plans that should be broadcasted.
9494
applyBroadcastHint(h.child, h.parameters.toSet)
@@ -102,7 +102,7 @@ object ResolveHints {
102102
*/
103103
object RemoveAllHints extends Rule[LogicalPlan] {
104104
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
105-
case h: Hint => h.child
105+
case h: UnresolvedHint => h.child
106106
}
107107
}
108108

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
862862
// Note that some operators (e.g. project, aggregate, union) are being handled separately
863863
// (earlier in this rule).
864864
case _: AppendColumns => true
865-
case _: BroadcastHint => true
865+
case _: ResolvedHint => true
866866
case _: Distinct => true
867867
case _: Generate => true
868868
case _: Pivot => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
478478
case _: Distinct => true
479479
case _: AppendColumns => true
480480
case _: AppendColumnsWithObject => true
481-
case _: BroadcastHint => true
481+
case _: ResolvedHint => true
482482
case _: RepartitionByExpression => true
483483
case _: Repartition => true
484484
case _: Sort => true

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
533533
}
534534

535535
/**
536-
* Add a [[Hint]] to a logical plan.
536+
* Add a [[UnresolvedHint]] to a logical plan.
537537
*/
538538
private def withHints(
539539
ctx: HintContext,
540540
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
541541
val stmt = ctx.hintStatement
542-
Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
542+
UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
543543
}
544544

545545
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper {
6565
val substitutedCondition = substitute(aliases)(condition)
6666
(fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
6767

68-
case BroadcastHint(child) =>
69-
collectProjectsAndFilters(child)
68+
case h: ResolvedHint =>
69+
collectProjectsAndFilters(h.child)
7070

7171
case other =>
7272
(None, Nil, other, Map.empty)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ case class Statistics(
6868
s"isBroadcastable=$isBroadcastable"
6969
).filter(_.nonEmpty).mkString(", ")
7070
}
71+
72+
/** Must be called when computing stats for a join operator to reset hints. */
73+
def resetHintsForJoin(): Statistics = copy(
74+
isBroadcastable = false
75+
)
7176
}
7277

7378

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ case class Join(
364364
case _ =>
365365
// Make sure we don't propagate isBroadcastable in other joins, because
366366
// they could explode the size.
367-
super.computeStats(conf).copy(isBroadcastable = false)
367+
super.computeStats(conf).resetHintsForJoin()
368368
}
369369

370370
if (conf.cboEnabled) {
@@ -375,26 +375,6 @@ case class Join(
375375
}
376376
}
377377

378-
/**
379-
* A hint for the optimizer that we should broadcast the `child` if used in a join operator.
380-
*/
381-
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
382-
override def output: Seq[Attribute] = child.output
383-
384-
// set isBroadcastable to true so the child will be broadcasted
385-
override def computeStats(conf: SQLConf): Statistics =
386-
child.stats(conf).copy(isBroadcastable = true)
387-
}
388-
389-
/**
390-
* A general hint for the child. This node will be eliminated post analysis.
391-
* A pair of (name, parameters).
392-
*/
393-
case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
394-
override lazy val resolved: Boolean = false
395-
override def output: Seq[Attribute] = child.output
396-
}
397-
398378
/**
399379
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
400380
* concrete implementations during analysis.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.internal.SQLConf
22+
23+
/**
24+
* A general hint for the child that is not yet resolved. This node is generated by the parser and
25+
* should be removed This node will be eliminated post analysis.
26+
* A pair of (name, parameters).
27+
*/
28+
case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
29+
extends UnaryNode {
30+
31+
override lazy val resolved: Boolean = false
32+
override def output: Seq[Attribute] = child.output
33+
}
34+
35+
/**
36+
* A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]].
37+
*/
38+
case class ResolvedHint(
39+
child: LogicalPlan,
40+
isBroadcastable: Option[Boolean] = None)
41+
extends UnaryNode {
42+
43+
override def output: Seq[Attribute] = child.output
44+
45+
override def computeStats(conf: SQLConf): Statistics = {
46+
val stats = child.stats(conf)
47+
isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
48+
}
49+
}

0 commit comments

Comments
 (0)