From adc4f8efd4b51b77d3600bcba8f331e92f7ea3c6 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 00:29:16 -0600 Subject: [PATCH 1/8] Added maxDepth to treeString which limits the depth of the printed string. --- .../spark/sql/catalyst/trees/TreeNode.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 45 ------------------- .../sql/execution/WholeStageCodegenExec.scala | 1 + 3 files changed, 2 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 102721616500..3468e27f5ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -75,7 +75,7 @@ object CurrentOrigin { } // scalastyle:off -abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { +abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Logging { // scalastyle:on self: BaseType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 905d035b6427..a8c4333eeefa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.execution -import java.io.{BufferedWriter, OutputStreamWriter, Writer} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import org.apache.commons.io.output.StringBuilderWriter -import org.apache.hadoop.fs.Path - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -193,30 +189,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } - private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { - try f(writer) - catch { - case e: AnalysisException => writer.write(e.toString) - } - } - - private def writePlans(writer: Writer): Unit = { - val (verbose, addSuffix) = (true, false) - - writer.write("== Parsed Logical Plan ==\n") - writeOrError(writer)(logical.treeString(_, verbose, addSuffix)) - writer.write("\n== Analyzed Logical Plan ==\n") - val analyzedOutput = stringOrError(Utils.truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")) - writer.write(analyzedOutput) - writer.write("\n") - writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix)) - writer.write("\n== Optimized Logical Plan ==\n") - writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix)) - writer.write("\n== Physical Plan ==\n") - writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix)) - } - override def toString: String = withRedaction { val writer = new StringBuilderWriter() try { @@ -269,22 +241,5 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } - - /** - * Dumps debug information about query execution into the specified file. - */ - def toFile(path: String): Unit = { - val filePath = new Path(path) - val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) - - try { - writePlans(writer) - writer.write("\n== Whole Stage Codegen ==\n") - org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) - } finally { - writer.close() - } - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 29bcbcae366c..611e59bc3d00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLMetrics From 3a9743fbc89358055c37cc45437f191fc5f15957 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 06:34:42 +0000 Subject: [PATCH 2/8] Added maxDepth to treeString which limits the depth of the printed string. --- .../spark/sql/catalyst/trees/TreeNode.scala | 102 +++++++++++++----- .../spark/sql/execution/QueryExecution.scala | 24 +++-- .../sql/execution/WholeStageCodegenExec.scala | 17 ++- .../sql/execution/QueryExecutionSuite.scala | 14 +++ 4 files changed, 117 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 3468e27f5ce4..f2aff216eca3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.trees import java.io.Writer import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.Map import scala.reflect.ClassTag @@ -29,6 +30,8 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -480,14 +483,22 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log } } - def treeString( + def treeString( + writer: Writer, + verbose: Boolean, + addSuffix: Boolean): Unit = { + treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth) + } + + def treeString( writer: Writer, verbose: Boolean, - addSuffix: Boolean): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix) + addSuffix: Boolean, + maxDepth: Int): Unit = { + generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxDepth) } - /** + /** * Returns a string representation of the nodes in this tree, where each operator is numbered. * The numbers can be used with [[TreeNode.apply]] to easily access specific subtrees. * @@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { + addSuffix: Boolean = false, + maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -559,30 +571,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log writer.write(if (lastChildren.last) "+- " else ":- ") } - val str = if (verbose) { - if (addSuffix) verboseStringWithSuffix else verboseString - } else { - simpleString - } - writer.write(prefix) - writer.write(str) - writer.write("\n") - - if (innerChildren.nonEmpty) { - innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, - addSuffix = addSuffix)) - innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, - addSuffix = addSuffix) - } - - if (children.nonEmpty) { - children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix)) - children.last.generateTreeString( - depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix) - } + if (depth < maxDepth) { + val str = if (verbose) { + if (addSuffix) verboseStringWithSuffix else verboseString + } else { + simpleString + } + writer.write(prefix) + writer.write(str) + writer.write("\n") + + if (innerChildren.nonEmpty) { + innerChildren.init.foreach(_.generateTreeString( + depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, + addSuffix = addSuffix, maxDepth = maxDepth)) + innerChildren.last.generateTreeString( + depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, + addSuffix = addSuffix, maxDepth = maxDepth) + } + + if (children.nonEmpty) { + children.init.foreach(_.generateTreeString( + depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxDepth)) + children.last.generateTreeString( + depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxDepth) + } + } + else { + if (TreeNode.treeDepthWarningPrinted.compareAndSet(false, true)) { + logWarn( + "Truncated the string representation of a plan since it was nested too deeply. " + + "This behavior can be adjusted by setting 'spark.debug.maxToStringTreeDepth' in " + + "SparkEnv.conf.") + } + writer.write(prefix) + writer.write("...\n") + } } /** @@ -701,3 +725,23 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log case _ => false } } + +object TreeNode { + /** + * Query plans for large, deeply nested plans can get extremely large. To limit the impact, + * we add a parameter that limits the logging to the top layers if the tree gets too deep. + * This can be overridden by setting the 'spark.debug.maxToStringTreeDepth' conf in SparkEnv. + */ + val DEFAULT_MAX_TO_STRING_TREE_DEPTH = 15 + + def maxTreeToStringDepth: Int = { + if (SparkEnv.get != null) { + SparkEnv.get.conf.getInt("spark.debug.maxToStringTreeDepth", DEFAULT_MAX_TO_STRING_TREE_DEPTH) + } else { + DEFAULT_MAX_TO_STRING_TREE_DEPTH + } + } + + /** Whether we have warned about plan string truncation yet. */ + private val treeDepthWarningPrinted = new AtomicBoolean(false) +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a8c4333eeefa..5b870b5a8c60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -190,13 +190,23 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } override def toString: String = withRedaction { - val writer = new StringBuilderWriter() - try { - writePlans(writer) - writer.toString - } finally { - writer.close() - } + def output = Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") + val analyzedPlan = Seq( + stringOrError(output), + stringOrError(analyzed.treeString(verbose = true)) + ).filter(_.nonEmpty).mkString("\n") + + val builder = new StringBuilder + builder.append("== Parsed Logical Plan ==\n") + builder.append(stringOrError(logical.treeString(verbose = true))) + builder.append("== Analyzed Logical Plan ==\n") + builder.append(analyzedPlan) + builder.append("== Optimized Logical Plan ==\n") + builder.append(stringOrError(optimizedPlan.treeString(verbose = true))) + builder.append("== Physical Plan ==\n") + builder.append(stringOrError(executedPlan.treeString(verbose = true))) + builder.toString } def stringWithStats: String = withRedaction { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 611e59bc3d00..5cc1a86a7bb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -455,8 +455,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { - child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false) + addSuffix: Boolean = false, + maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = { + child.generateTreeString(depth, lastChildren, writer, verbose, prefix, addSuffix, maxDepth) } override def needCopyResult: Boolean = false @@ -731,8 +732,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { - child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false) + addSuffix: Boolean = false, + maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = { + child.generateTreeString( + depth, + lastChildren, + writer, + verbose, + s"*($codegenStageId) ", + false, + maxDepth) } override def needStopCheck: Boolean = true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index a5922d7c825d..eb1ffba4c53c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.test.SharedSQLContext +case class Simple(a: String, b: Int) + class QueryExecutionSuite extends SharedSQLContext { def checkDumpedPlans(path: String, expected: Int): Unit = { assert(Source.fromFile(path).getLines.toList @@ -108,4 +110,16 @@ class QueryExecutionSuite extends SharedSQLContext { val error = intercept[Error](qe.toString) assert(error.getMessage.contains("error")) } + + test("toString() tree depth") { + import testImplicits._ + + val s = Seq(Simple("a", 1), Simple("b", 3), Simple("c", 4)) + val ds = (1 until 30).foldLeft(s.toDF()) { case (newDs, _) => + newDs.join(s.toDF(), "a") + } + + val nLines = ds.queryExecution.optimizedPlan.toString.split("\n").length + assert(nLines < 30) + } } From 0681f059648ef3f7f00a488b947d2d7185364614 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 00:44:33 -0600 Subject: [PATCH 3/8] Removed outdated changes to QueryExecution --- .../spark/sql/execution/QueryExecution.scala | 69 ++++++++++++++----- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5b870b5a8c60..905d035b6427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.execution +import java.io.{BufferedWriter, OutputStreamWriter, Writer} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.commons.io.output.StringBuilderWriter +import org.apache.hadoop.fs.Path + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -189,24 +193,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } + private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { + try f(writer) + catch { + case e: AnalysisException => writer.write(e.toString) + } + } + + private def writePlans(writer: Writer): Unit = { + val (verbose, addSuffix) = (true, false) + + writer.write("== Parsed Logical Plan ==\n") + writeOrError(writer)(logical.treeString(_, verbose, addSuffix)) + writer.write("\n== Analyzed Logical Plan ==\n") + val analyzedOutput = stringOrError(Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")) + writer.write(analyzedOutput) + writer.write("\n") + writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix)) + writer.write("\n== Optimized Logical Plan ==\n") + writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix)) + writer.write("\n== Physical Plan ==\n") + writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix)) + } + override def toString: String = withRedaction { - def output = Utils.truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") - val analyzedPlan = Seq( - stringOrError(output), - stringOrError(analyzed.treeString(verbose = true)) - ).filter(_.nonEmpty).mkString("\n") - - val builder = new StringBuilder - builder.append("== Parsed Logical Plan ==\n") - builder.append(stringOrError(logical.treeString(verbose = true))) - builder.append("== Analyzed Logical Plan ==\n") - builder.append(analyzedPlan) - builder.append("== Optimized Logical Plan ==\n") - builder.append(stringOrError(optimizedPlan.treeString(verbose = true))) - builder.append("== Physical Plan ==\n") - builder.append(stringOrError(executedPlan.treeString(verbose = true))) - builder.toString + val writer = new StringBuilderWriter() + try { + writePlans(writer) + writer.toString + } finally { + writer.close() + } } def stringWithStats: String = withRedaction { @@ -251,5 +269,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + + /** + * Dumps debug information about query execution into the specified file. + */ + def toFile(path: String): Unit = { + val filePath = new Path(path) + val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) + + try { + writePlans(writer) + writer.write("\n== Whole Stage Codegen ==\n") + org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) + } finally { + writer.close() + } + } } } From c4cef026bf03f383f6c3576543e3c52718fd0676 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 00:48:42 -0600 Subject: [PATCH 4/8] Whitespace changes --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f2aff216eca3..7ebf4061a277 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -484,9 +484,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log } def treeString( - writer: Writer, - verbose: Boolean, - addSuffix: Boolean): Unit = { + writer: Writer, + verbose: Boolean, + addSuffix: Boolean): Unit = { treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth) } @@ -498,7 +498,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxDepth) } - /** + /** * Returns a string representation of the nodes in this tree, where each operator is numbered. * The numbers can be used with [[TreeNode.apply]] to easily access specific subtrees. * @@ -599,7 +599,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log } else { if (TreeNode.treeDepthWarningPrinted.compareAndSet(false, true)) { - logWarn( + logWarning( "Truncated the string representation of a plan since it was nested too deeply. " + "This behavior can be adjusted by setting 'spark.debug.maxToStringTreeDepth' in " + "SparkEnv.conf.") From a3ef2ba5363bd142b6c8f5157f6c96549192ed04 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 00:52:50 -0600 Subject: [PATCH 5/8] Remove tabs --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 7ebf4061a277..fed98e5e175a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -484,10 +484,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log } def treeString( - writer: Writer, - verbose: Boolean, - addSuffix: Boolean): Unit = { - treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth) + writer: Writer, + verbose: Boolean, + addSuffix: Boolean): Unit = { + treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth) } def treeString( From 7c178abdd0cc8cb6d37b404560691b80df708942 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 00:54:25 -0600 Subject: [PATCH 6/8] Remove tabs --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index fed98e5e175a..1e2371d2664b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -483,14 +483,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Log } } - def treeString( + def treeString( writer: Writer, verbose: Boolean, addSuffix: Boolean): Unit = { treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth) - } + } - def treeString( + def treeString( writer: Writer, verbose: Boolean, addSuffix: Boolean, From 45d4693a71a3b02aa8942d8c3a47494ad21083b5 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 07:53:14 -0600 Subject: [PATCH 7/8] Corrected test condition --- .../sql/execution/QueryExecutionSuite.scala | 138 +++++++++--------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index eb1ffba4c53c..f71712c56be2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -42,74 +42,74 @@ class QueryExecutionSuite extends SharedSQLContext { s"*(1) Range (0, $expected, step=1, splits=2)", "")) } - test("dumping query execution info to a file") { - withTempDir { dir => - val path = dir.getCanonicalPath + "/plans.txt" - val df = spark.range(0, 10) - df.queryExecution.debug.toFile(path) - - checkDumpedPlans(path, expected = 10) - } - } - - test("dumping query execution info to an existing file") { - withTempDir { dir => - val path = dir.getCanonicalPath + "/plans.txt" - val df = spark.range(0, 10) - df.queryExecution.debug.toFile(path) - - val df2 = spark.range(0, 1) - df2.queryExecution.debug.toFile(path) - checkDumpedPlans(path, expected = 1) - } - } - - test("dumping query execution info to non-existing folder") { - withTempDir { dir => - val path = dir.getCanonicalPath + "/newfolder/plans.txt" - val df = spark.range(0, 100) - df.queryExecution.debug.toFile(path) - checkDumpedPlans(path, expected = 100) - } - } - - test("dumping query execution info by invalid path") { - val path = "1234567890://plans.txt" - val exception = intercept[IllegalArgumentException] { - spark.range(0, 100).queryExecution.debug.toFile(path) - } - - assert(exception.getMessage.contains("Illegal character in scheme name")) - } - - test("toString() exception/error handling") { - spark.experimental.extraStrategies = Seq( - new SparkStrategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil - }) - - def qe: QueryExecution = new QueryExecution(spark, OneRowRelation()) - - // Nothing! - assert(qe.toString.contains("OneRowRelation")) - - // Throw an AnalysisException - this should be captured. - spark.experimental.extraStrategies = Seq( - new SparkStrategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = - throw new AnalysisException("exception") - }) - assert(qe.toString.contains("org.apache.spark.sql.AnalysisException")) - - // Throw an Error - this should not be captured. - spark.experimental.extraStrategies = Seq( - new SparkStrategy { - override def apply(plan: LogicalPlan): Seq[SparkPlan] = - throw new Error("error") - }) - val error = intercept[Error](qe.toString) - assert(error.getMessage.contains("error")) - } +// test("dumping query execution info to a file") { +// withTempDir { dir => +// val path = dir.getCanonicalPath + "/plans.txt" +// val df = spark.range(0, 10) +// df.queryExecution.debug.toFile(path) +// +// checkDumpedPlans(path, expected = 10) +// } +// } +// +// test("dumping query execution info to an existing file") { +// withTempDir { dir => +// val path = dir.getCanonicalPath + "/plans.txt" +// val df = spark.range(0, 10) +// df.queryExecution.debug.toFile(path) +// +// val df2 = spark.range(0, 1) +// df2.queryExecution.debug.toFile(path) +// checkDumpedPlans(path, expected = 1) +// } +// } +// +// test("dumping query execution info to non-existing folder") { +// withTempDir { dir => +// val path = dir.getCanonicalPath + "/newfolder/plans.txt" +// val df = spark.range(0, 100) +// df.queryExecution.debug.toFile(path) +// checkDumpedPlans(path, expected = 100) +// } +// } +// +// test("dumping query execution info by invalid path") { +// val path = "1234567890://plans.txt" +// val exception = intercept[IllegalArgumentException] { +// spark.range(0, 100).queryExecution.debug.toFile(path) +// } +// +// assert(exception.getMessage.contains("Illegal character in scheme name")) +// } +// +// test("toString() exception/error handling") { +// spark.experimental.extraStrategies = Seq( +// new SparkStrategy { +// override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil +// }) +// +// def qe: QueryExecution = new QueryExecution(spark, OneRowRelation()) +// +// // Nothing! +// assert(qe.toString.contains("OneRowRelation")) +// +// // Throw an AnalysisException - this should be captured. +// spark.experimental.extraStrategies = Seq( +// new SparkStrategy { +// override def apply(plan: LogicalPlan): Seq[SparkPlan] = +// throw new AnalysisException("exception") +// }) +// assert(qe.toString.contains("org.apache.spark.sql.AnalysisException")) +// +// // Throw an Error - this should not be captured. +// spark.experimental.extraStrategies = Seq( +// new SparkStrategy { +// override def apply(plan: LogicalPlan): Seq[SparkPlan] = +// throw new Error("error") +// }) +// val error = intercept[Error](qe.toString) +// assert(error.getMessage.contains("error")) +// } test("toString() tree depth") { import testImplicits._ @@ -120,6 +120,6 @@ class QueryExecutionSuite extends SharedSQLContext { } val nLines = ds.queryExecution.optimizedPlan.toString.split("\n").length - assert(nLines < 30) + assert(nLines <= 31) } } From 42e4fcfb5678e83018013f7181c5bafc96c8136e Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 18 Nov 2018 07:53:50 -0600 Subject: [PATCH 8/8] Reenabled file-based tests --- .../sql/execution/QueryExecutionSuite.scala | 136 +++++++++--------- 1 file changed, 68 insertions(+), 68 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index f71712c56be2..b35e809c881d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -42,74 +42,74 @@ class QueryExecutionSuite extends SharedSQLContext { s"*(1) Range (0, $expected, step=1, splits=2)", "")) } -// test("dumping query execution info to a file") { -// withTempDir { dir => -// val path = dir.getCanonicalPath + "/plans.txt" -// val df = spark.range(0, 10) -// df.queryExecution.debug.toFile(path) -// -// checkDumpedPlans(path, expected = 10) -// } -// } -// -// test("dumping query execution info to an existing file") { -// withTempDir { dir => -// val path = dir.getCanonicalPath + "/plans.txt" -// val df = spark.range(0, 10) -// df.queryExecution.debug.toFile(path) -// -// val df2 = spark.range(0, 1) -// df2.queryExecution.debug.toFile(path) -// checkDumpedPlans(path, expected = 1) -// } -// } -// -// test("dumping query execution info to non-existing folder") { -// withTempDir { dir => -// val path = dir.getCanonicalPath + "/newfolder/plans.txt" -// val df = spark.range(0, 100) -// df.queryExecution.debug.toFile(path) -// checkDumpedPlans(path, expected = 100) -// } -// } -// -// test("dumping query execution info by invalid path") { -// val path = "1234567890://plans.txt" -// val exception = intercept[IllegalArgumentException] { -// spark.range(0, 100).queryExecution.debug.toFile(path) -// } -// -// assert(exception.getMessage.contains("Illegal character in scheme name")) -// } -// -// test("toString() exception/error handling") { -// spark.experimental.extraStrategies = Seq( -// new SparkStrategy { -// override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil -// }) -// -// def qe: QueryExecution = new QueryExecution(spark, OneRowRelation()) -// -// // Nothing! -// assert(qe.toString.contains("OneRowRelation")) -// -// // Throw an AnalysisException - this should be captured. -// spark.experimental.extraStrategies = Seq( -// new SparkStrategy { -// override def apply(plan: LogicalPlan): Seq[SparkPlan] = -// throw new AnalysisException("exception") -// }) -// assert(qe.toString.contains("org.apache.spark.sql.AnalysisException")) -// -// // Throw an Error - this should not be captured. -// spark.experimental.extraStrategies = Seq( -// new SparkStrategy { -// override def apply(plan: LogicalPlan): Seq[SparkPlan] = -// throw new Error("error") -// }) -// val error = intercept[Error](qe.toString) -// assert(error.getMessage.contains("error")) -// } + test("dumping query execution info to a file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + checkDumpedPlans(path, expected = 10) + } + } + + test("dumping query execution info to an existing file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + val df2 = spark.range(0, 1) + df2.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 1) + } + } + + test("dumping query execution info to non-existing folder") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/newfolder/plans.txt" + val df = spark.range(0, 100) + df.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 100) + } + } + + test("dumping query execution info by invalid path") { + val path = "1234567890://plans.txt" + val exception = intercept[IllegalArgumentException] { + spark.range(0, 100).queryExecution.debug.toFile(path) + } + + assert(exception.getMessage.contains("Illegal character in scheme name")) + } + + test("toString() exception/error handling") { + spark.experimental.extraStrategies = Seq( + new SparkStrategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil + }) + + def qe: QueryExecution = new QueryExecution(spark, OneRowRelation()) + + // Nothing! + assert(qe.toString.contains("OneRowRelation")) + + // Throw an AnalysisException - this should be captured. + spark.experimental.extraStrategies = Seq( + new SparkStrategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = + throw new AnalysisException("exception") + }) + assert(qe.toString.contains("org.apache.spark.sql.AnalysisException")) + + // Throw an Error - this should not be captured. + spark.experimental.extraStrategies = Seq( + new SparkStrategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = + throw new Error("error") + }) + val error = intercept[Error](qe.toString) + assert(error.getMessage.contains("error")) + } test("toString() tree depth") { import testImplicits._