Skip to content

Commit f351362

Browse files
committed
address comments.
1 parent 98cab44 commit f351362

7 files changed

Lines changed: 18 additions & 12 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import scala.collection.JavaConverters._
2121

2222
import com.google.common.util.concurrent.AtomicLongMap
2323

24-
import org.apache.spark.{Logging, SparkException}
24+
import org.apache.spark.Logging
25+
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2526
import org.apache.spark.sql.catalyst.trees.TreeNode
2627
import org.apache.spark.sql.catalyst.util.sideBySide
2728
import org.apache.spark.util.Utils
@@ -51,18 +52,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
5152
*/
5253
abstract class Strategy {
5354
def maxIterations: Int
54-
def throws: Boolean
55+
def throwsExceptionUponMaxIterations: Boolean
5556
}
5657

5758
/** A strategy that only runs once. */
5859
case object Once extends Strategy {
59-
val maxIterations = 1
60-
val throws = false
60+
override val maxIterations = 1
61+
override val throwsExceptionUponMaxIterations = false
6162
}
6263

6364
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
6465
case class FixedPoint(maxIterations: Int) extends Strategy {
65-
override val throws: Boolean = if (Utils.isTesting) true else false
66+
override val throwsExceptionUponMaxIterations: Boolean = if (Utils.isTesting) true else false
6667
}
6768
/** A batch of rules. */
6869
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
@@ -108,7 +109,11 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
108109
// Only log if this is a rule that is supposed to run more than once.
109110
if (iteration != 2) {
110111
val msg = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
111-
if (batch.strategy.throws) throw new SparkException(msg) else logTrace(msg)
112+
if (batch.strategy.throwsExceptionUponMaxIterations) {
113+
throw new TreeNodeException(curPlan, msg, null)
114+
} else {
115+
logTrace(msg)
116+
}
112117
}
113118
continue = false
114119
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StringType
3333
class ColumnPruningSuite extends PlanTest {
3434

3535
object Optimize extends RuleExecutor[LogicalPlan] {
36-
System.setProperty("spark.testing", "true")
3736
val batches = Batch("Column pruning", FixedPoint(100),
3837
ColumnPruning,
3938
CollapseProject) :: Nil

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.rules._
2626
class CombiningLimitsSuite extends PlanTest {
2727

2828
object Optimize extends RuleExecutor[LogicalPlan] {
29-
System.setProperty("spark.testing", "true")
3029
val batches =
3130
Batch("Filter Pushdown", FixedPoint(100),
3231
ColumnPruning) ::

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
2525
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2626

2727
class ComputeCurrentTimeSuite extends PlanTest {
28-
System.setProperty("spark.testing", "true")
2928
object Optimize extends RuleExecutor[LogicalPlan] {
3029
val batches = Seq(Batch("ComputeCurrentTime", Once, ComputeCurrentTime))
3130
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.apache.spark.sql.types.IntegerType
2929
class SimplifyConditionalSuite extends PlanTest with PredicateHelper {
3030

3131
object Optimize extends RuleExecutor[LogicalPlan] {
32-
System.setProperty("spark.testing", "true")
3332
val batches =
3433
Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil
3534
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import org.apache.spark.sql.catalyst.util._
2626
* Provides helper methods for comparing plans.
2727
*/
2828
abstract class PlanTest extends SparkFunSuite with PredicateHelper {
29+
30+
System.setProperty("spark.testing", "true")
31+
2932
/**
3033
* Since attribute references are given globally unique ids during analysis,
3134
* we must normalize them to check if two different queries are identical.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.sql.catalyst.trees
1919

20-
import org.apache.spark.{SparkException, SparkFunSuite}
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2122
import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2224
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
2325

2426
class RuleExecutorSuite extends SparkFunSuite {
@@ -50,7 +52,7 @@ class RuleExecutorSuite extends SparkFunSuite {
5052
val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil
5153
}
5254

53-
val message = intercept[SparkException] {
55+
val message = intercept[TreeNodeException[LogicalPlan]] {
5456
ToFixedPoint.execute(Literal(100))
5557
}.getMessage
5658
assert(message.contains("Max iterations (10) reached for batch fixedPoint"))

0 commit comments

Comments
 (0)