Skip to content

Commit 3b98a0f

Browse files
committed
[SPARK-25415][SQL] Make plan change log in RuleExecutor configurable by SQLConf
1 parent 3030b82 commit 3b98a0f

3 files changed

Lines changed: 200 additions & 5 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2222
import org.apache.spark.sql.catalyst.trees.TreeNode
2323
import org.apache.spark.sql.catalyst.util.sideBySide
24+
import org.apache.spark.sql.internal.SQLConf
2425
import org.apache.spark.util.Utils
2526

2627
object RuleExecutor {
@@ -72,6 +73,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
7273
def execute(plan: TreeType): TreeType = {
7374
var curPlan = plan
7475
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
76+
val planChangeLogger = new PlanChangeLogger()
7577

7678
batches.foreach { batch =>
7779
val batchStartPlan = curPlan
@@ -90,11 +92,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
9092
if (!result.fastEquals(plan)) {
9193
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
9294
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
93-
logTrace(
94-
s"""
95-
|=== Applying Rule ${rule.ruleName} ===
96-
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
97-
""".stripMargin)
95+
planChangeLogger.log(rule.ruleName, plan, result)
9896
}
9997
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
10098
queryExecutionMetrics.incNumExecution(rule.ruleName)
@@ -143,4 +141,29 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
143141

144142
curPlan
145143
}
144+
145+
private class PlanChangeLogger {
146+
147+
private val logLevel = SQLConf.get.optimizerPlanChangeLogLevel.toUpperCase
148+
149+
private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq)
150+
151+
def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
152+
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
153+
lazy val message =
154+
s"""
155+
|=== Applying Rule ${ruleName} ===
156+
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
157+
""".stripMargin
158+
logLevel match {
159+
case "TRACE" => logTrace(message)
160+
case "DEBUG" => logDebug(message)
161+
case "INFO" => logInfo(message)
162+
case "WARN" => logWarning(message)
163+
case "ERROR" => logError(message)
164+
case _ => logTrace(message)
165+
}
166+
}
167+
}
168+
}
146169
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,26 @@ object SQLConf {
171171
.intConf
172172
.createWithDefault(10)
173173

174+
val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
175+
.internal()
176+
.doc("Configures the log level for logging the change from the original plan to the new " +
177+
"plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " +
178+
"'error'. The default log level is 'trace'.")
179+
.stringConf
180+
.checkValue(
181+
str => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(str.toUpperCase),
182+
"Invalid value for 'spark.sql.optimizer.planChangeLog.level'. Valid values are " +
183+
"'trace', 'debug', 'info', 'warn' and 'error'.")
184+
.createWithDefault("trace")
185+
186+
val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules")
187+
.internal()
188+
.doc("If this configuration is set, the optimizer will only log plan changes caused by " +
189+
"applying the rules specified in this configuration. The value can be a list of rule " +
190+
"names separated by comma.")
191+
.stringConf
192+
.createOptional
193+
174194
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
175195
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
176196
"column based on statistics of the data.")
@@ -1570,6 +1590,10 @@ class SQLConf extends Serializable with Logging {
15701590

15711591
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
15721592

1593+
def optimizerPlanChangeLogLevel: String = getConf(OPTIMIZER_PLAN_CHANGE_LOG_LEVEL)
1594+
1595+
def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)
1596+
15731597
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
15741598

15751599
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger}
23+
import org.apache.log4j.spi.LoggingEvent
24+
25+
import org.apache.spark.sql.catalyst.dsl.expressions._
26+
import org.apache.spark.sql.catalyst.dsl.plans._
27+
import org.apache.spark.sql.catalyst.plans.PlanTest
28+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
29+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
30+
import org.apache.spark.sql.internal.SQLConf
31+
32+
class OptimizerLoggingSuite extends PlanTest {
33+
34+
object Optimize extends RuleExecutor[LogicalPlan] {
35+
val batches = Batch("Optimizer Batch", FixedPoint(100),
36+
PushDownPredicate,
37+
ColumnPruning,
38+
CollapseProject) :: Nil
39+
}
40+
41+
class MockAppender extends AppenderSkeleton {
42+
val loggingEvents = new ArrayBuffer[LoggingEvent]()
43+
44+
override def append(loggingEvent: LoggingEvent): Unit = {
45+
if (loggingEvent.getRenderedMessage().contains("Applying Rule")) {
46+
loggingEvents.append(loggingEvent)
47+
}
48+
}
49+
50+
override def close(): Unit = {}
51+
override def requiresLayout(): Boolean = false
52+
}
53+
54+
private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
55+
val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
56+
val restoreLevel = logger.getLevel
57+
logger.setLevel(level)
58+
logger.addAppender(appender)
59+
try f finally {
60+
logger.setLevel(restoreLevel)
61+
logger.removeAppender(appender)
62+
}
63+
}
64+
65+
private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = {
66+
val logAppender = new MockAppender()
67+
withLogLevelAndAppender(Level.TRACE, logAppender) {
68+
val input = LocalRelation('a.int, 'b.string, 'c.double)
69+
val query = input.select('a, 'b).select('a).where('a > 1).analyze
70+
val expected = input.where('a > 1).select('a).analyze
71+
comparePlans(Optimize.execute(query), expected)
72+
}
73+
val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
74+
assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule))))
75+
assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
76+
}
77+
78+
test("test log level") {
79+
val levels = Seq(
80+
"TRACE" -> Level.TRACE,
81+
"trace" -> Level.TRACE,
82+
"DEBUG" -> Level.DEBUG,
83+
"debug" -> Level.DEBUG,
84+
"INFO" -> Level.INFO,
85+
"info" -> Level.INFO,
86+
"WARN" -> Level.WARN,
87+
"warn" -> Level.WARN,
88+
"ERROR" -> Level.ERROR,
89+
"error" -> Level.ERROR,
90+
"deBUG" -> Level.DEBUG)
91+
92+
levels.foreach { level =>
93+
withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> level._1) {
94+
verifyLog(
95+
level._2,
96+
Seq(
97+
PushDownPredicate.ruleName,
98+
ColumnPruning.ruleName,
99+
CollapseProject.ruleName))
100+
}
101+
}
102+
}
103+
104+
test("test invalid log level conf") {
105+
val levels = Seq(
106+
"",
107+
"*d_",
108+
"infoo")
109+
110+
levels.foreach { level =>
111+
val error = intercept[IllegalArgumentException] {
112+
withSQLConf(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> level) {}
113+
}
114+
assert(error.getMessage.contains(
115+
"Invalid value for 'spark.sql.optimizer.planChangeLog.level'."))
116+
}
117+
}
118+
119+
test("test log rules") {
120+
val rulesSeq = Seq(
121+
Seq(PushDownPredicate.ruleName,
122+
ColumnPruning.ruleName,
123+
CollapseProject.ruleName).reduce(_ + "," + _) ->
124+
Seq(PushDownPredicate.ruleName,
125+
ColumnPruning.ruleName,
126+
CollapseProject.ruleName),
127+
Seq(PushDownPredicate.ruleName,
128+
ColumnPruning.ruleName).reduce(_ + "," + _) ->
129+
Seq(PushDownPredicate.ruleName,
130+
ColumnPruning.ruleName),
131+
CollapseProject.ruleName ->
132+
Seq(CollapseProject.ruleName),
133+
Seq(ColumnPruning.ruleName,
134+
"DummyRule").reduce(_ + "," + _) ->
135+
Seq(ColumnPruning.ruleName),
136+
"DummyRule" -> Seq(),
137+
"" -> Seq()
138+
)
139+
140+
rulesSeq.foreach { case (rulesConf, expectedRules) =>
141+
withSQLConf(
142+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key -> rulesConf,
143+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
144+
verifyLog(Level.INFO, expectedRules)
145+
}
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)