From bf9f07bb25df96e369e13f164b6a8ca1030bc8a4 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Sun, 9 Feb 2020 18:35:17 +0800 Subject: [PATCH 1/3] Impl common explain field formatter --- .../spark/sql/catalyst/plans/QueryPlan.scala | 3 + .../sql/execution/DataSourceScanExec.scala | 4 +- .../spark/sql/execution/ExplainUtils.scala | 14 +- .../spark/sql/execution/SparkPlan.scala | 15 +- .../execution/basicPhysicalOperators.scala | 6 +- .../sql/execution/exchange/Exchange.scala | 2 +- .../joins/CartesianProductExec.scala | 2 +- .../spark/sql/execution/joins/HashJoin.scala | 6 +- .../execution/joins/SortMergeJoinExec.scala | 6 +- .../sql-tests/results/explain.sql.out | 260 ++++++++++-------- 10 files changed, 188 insertions(+), 130 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index c5abb6378ff7c..0faac665df7cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -189,8 +189,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT val codegenIdStr = getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + val argStr = argString(SQLConf.get.maxToStringFields) + s""" |($operatorId) $nodeName $codegenIdStr + |Arguments: ${if (argStr != null && !argStr.isEmpty) argStr else "None"} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0d759085a7e2c..c1f7f0a195491 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -76,7 +76,7 @@ trait DataSourceScanExec extends LeafExecNode { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} |${metadataStr.mkString("\n")} """.stripMargin } @@ -377,7 +377,7 @@ case class FileSourceScanExec( s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} |${metadataStr.mkString("\n")} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index d4fe272f8c95f..8561cb18c8601 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.trees.TreeNodeTag object ExplainUtils { /** @@ -171,7 +170,7 @@ object ExplainUtils { var currentCodegenId = -1 plan.foreach { case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId - case p: InputAdapter => currentCodegenId = -1 + case _: InputAdapter => currentCodegenId = -1 case other: QueryPlan[_] => if (currentCodegenId != -1) { other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId) @@ -182,6 +181,17 @@ object ExplainUtils { } } + /** + * Generate detailed field string with different format based on type of input value + */ + def generateFieldString(fieldName: String, values: Any): String = values match { + case iter: Iterable[_] if (iter.size == 0) => s"${fieldName}: []" + case iter: Iterable[_] => s"${fieldName} [${iter.size}]: ${iter.mkString("[", ", ", "]")}" + case str: String if (str == null || str.isEmpty) => s"${fieldName}: None" + case str: String => s"${fieldName}: ${str}" + case _ => s"${fieldName}: Unknown" + } + /** * Given a input plan, returns an array of tuples comprising of : * 1. Hosting opeator id. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 3301e9b5ab180..22501d28db90c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch object SparkPlan { @@ -512,9 +513,11 @@ trait LeafExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet override def verboseStringWithOperatorId(): String = { + val argumentString = argString(SQLConf.get.maxToStringFields) s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output: ${producedAttributes.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Arguments", argumentString)} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} """.stripMargin } } @@ -531,9 +534,11 @@ trait UnaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = child :: Nil override def verboseStringWithOperatorId(): String = { + val argumentString = argString(SQLConf.get.maxToStringFields) s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input: ${child.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Arguments", argumentString)} """.stripMargin } } @@ -544,10 +549,12 @@ trait BinaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Seq(left, right) override def verboseStringWithOperatorId(): String = { + val argumentString = argString(SQLConf.get.maxToStringFields) s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left output: ${left.output.mkString("[", ", ", "]")} - |Right output: ${right.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Left output", left.output)} + |${ExplainUtils.generateFieldString("Right output", right.output)} + |${ExplainUtils.generateFieldString("Arguments", argumentString)} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c35c48496e1c9..e7d2b627d0815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -86,8 +86,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def verboseStringWithOperatorId(): String = { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Output : ${projectList.mkString("[", ", ", "]")} - |Input : ${child.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Output", projectList)} + |${ExplainUtils.generateFieldString("Input", child.output)} """.stripMargin } } @@ -243,7 +243,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) override def verboseStringWithOperatorId(): String = { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input : ${child.output.mkString("[", ", ", "]")} + |${ExplainUtils.generateFieldString("Input", child.output)} |Condition : ${condition} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index 849ff384c130a..dda9a637194fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -92,7 +92,7 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan val reuse_op_str = ExplainUtils.getOpId(child) s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${cdgen} [Reuses operator id: $reuse_op_str] - |Output : ${output} + |${ExplainUtils.generateFieldString("Output", output)} """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 29645a736548c..7e2f487fdcc5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -73,7 +73,7 @@ case class CartesianProductExec( s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Join condition: ${joinCondStr} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 137f0b87a2f3d..99cf60273bf08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -49,9 +49,9 @@ trait HashJoin { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left keys: ${leftKeys} - |Right keys: ${rightKeys} - |Join condition: ${joinCondStr} + |${ExplainUtils.generateFieldString("Left keys", leftKeys)} + |${ExplainUtils.generateFieldString("Right keys", rightKeys)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 62eea611556ff..7a08dd1afd3a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -63,9 +63,9 @@ case class SortMergeJoinExec( } else "None" s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Left keys : ${leftKeys} - |Right keys: ${rightKeys} - |Join condition : ${joinCondStr} + |${ExplainUtils.generateFieldString("Left keys", leftKeys)} + |${ExplainUtils.generateFieldString("Right keys", rightKeys)} + |${ExplainUtils.generateFieldString("Join condition", joinCondStr)} """.stripMargin } diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index bc28d7f87bf00..c8ce4e4e9e385 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -65,22 +65,23 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] Input: [key#x, val#x] @@ -90,7 +91,8 @@ Aggregate Attributes: [max#x] Results: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] (7) HashAggregate [codegen id : 2] Input: [key#x, max#x] @@ -100,10 +102,12 @@ Aggregate Attributes: [max(val#x)#x] Results: [key#x, max(val#x)#x AS max(val)#x] (8) Exchange -Input: [key#x, max(val)#x] +Input [2]: [key#x, max(val)#x] +Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] (9) Sort [codegen id : 3] -Input: [key#x, max(val)#x] +Input [2]: [key#x, max(val)#x] +Arguments: [key#x ASC NULLS FIRST], true, 0 -- !query @@ -129,22 +133,23 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] Input: [key#x, val#x] @@ -154,7 +159,8 @@ Aggregate Attributes: [max#x] Results: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] (7) HashAggregate [codegen id : 2] Input: [key#x, max#x] @@ -164,12 +170,12 @@ Aggregate Attributes: [max(val#x)#x] Results: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] (8) Filter [codegen id : 2] -Input : [key#x, max(val)#x, max(val#x)#x] +Input [3]: [key#x, max(val)#x, max(val#x)#x] Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0)) (9) Project [codegen id : 2] -Output : [key#x, max(val)#x] -Input : [key#x, max(val)#x, max(val#x)#x] +Output [2]: [key#x, max(val)#x] +Input [3]: [key#x, max(val)#x, max(val#x)#x] -- !query @@ -196,42 +202,45 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,0)] ReadSchema: struct (6) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (7) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) (8) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) Union +Arguments: None (10) HashAggregate [codegen id : 3] Input: [key#x, val#x] @@ -241,7 +250,8 @@ Aggregate Attributes: [] Results: [key#x, val#x] (11) Exchange -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] (12) HashAggregate [codegen id : 4] Input: [key#x, val#x] @@ -274,47 +284,50 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key)] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (4) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (8) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) BroadcastExchange -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] (10) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -339,38 +352,41 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key)] ReadSchema: struct (4) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (5) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : isnotnull(key#x) (6) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (7) BroadcastExchange -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] (8) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -396,22 +412,23 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] ===== Subqueries ===== @@ -426,22 +443,23 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (5) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) (8) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (9) HashAggregate [codegen id : 1] Input: [key#x] @@ -451,7 +469,8 @@ Aggregate Attributes: [max#x] Results: [max#x] (10) Exchange -Input: [max#x] +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] (11) HashAggregate [codegen id : 2] Input: [max#x] @@ -471,22 +490,23 @@ Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery (12) Scan parquet default.explain_temp3 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (13) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (14) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (15) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (16) HashAggregate [codegen id : 1] Input: [key#x] @@ -496,7 +516,8 @@ Aggregate Attributes: [max#x] Results: [max#x] (17) Exchange -Input: [max#x] +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] (18) HashAggregate [codegen id : 2] Input: [max#x] @@ -527,16 +548,17 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) ===== Subqueries ===== @@ -552,22 +574,23 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (4) Scan parquet default.explain_temp2 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp2] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (5) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (6) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (7) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (8) HashAggregate [codegen id : 1] Input: [key#x] @@ -577,7 +600,8 @@ Aggregate Attributes: [max#x] Results: [max#x] (9) Exchange -Input: [max#x] +Input [1]: [max#x] +Arguments: SinglePartition, true, [id=#x] (10) HashAggregate [codegen id : 2] Input: [max#x] @@ -597,22 +621,23 @@ Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (11) Scan parquet default.explain_temp3 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp3] PushedFilters: [IsNotNull(val), GreaterThan(val,0)] ReadSchema: struct (12) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (13) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) (14) Project [codegen id : 1] -Output : [key#x] -Input : [key#x, val#x] +Output [1]: [key#x] +Input [2]: [key#x, val#x] (15) HashAggregate [codegen id : 1] Input: [key#x] @@ -653,10 +678,11 @@ ReadSchema: struct<> (2) ColumnarToRow [codegen id : 1] Input: [] +Arguments: None (3) Project [codegen id : 1] -Output : [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] -Input : [] +Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] +Input: [] ===== Subqueries ===== @@ -669,13 +695,14 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery (4) Scan parquet default.explain_temp1 -Output: [key#x] +Output [1]: [key#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (5) ColumnarToRow [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] +Arguments: None (6) HashAggregate [codegen id : 1] Input: [key#x] @@ -685,7 +712,8 @@ Aggregate Attributes: [sum#x, count#xL] Results: [sum#x, count#xL] (7) Exchange -Input: [sum#x, count#xL] +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, true, [id=#x] (8) HashAggregate [codegen id : 2] Input: [sum#x, count#xL] @@ -722,47 +750,50 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (2) ColumnarToRow [codegen id : 2] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 2] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (4) Project [codegen id : 2] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (6) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (7) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (8) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (9) BroadcastExchange -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] (10) BroadcastHashJoin [codegen id : 2] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -793,22 +824,23 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] PushedFilters: [IsNotNull(key), GreaterThan(key,10)] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] +Input [2]: [key#x, val#x] +Arguments: None (3) Filter [codegen id : 1] -Input : [key#x, val#x] +Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) (4) Project [codegen id : 1] -Output : [key#x, val#x] -Input : [key#x, val#x] +Output [2]: [key#x, val#x] +Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] Input: [key#x, val#x] @@ -818,7 +850,8 @@ Aggregate Attributes: [max#x] Results: [key#x, max#x] (6) Exchange -Input: [key#x, max#x] +Input [2]: [key#x, max#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] (7) HashAggregate [codegen id : 4] Input: [key#x, max#x] @@ -828,7 +861,7 @@ Aggregate Attributes: [max(val#x)#x] Results: [key#x, max(val#x)#x AS max(val)#x] (8) ReusedExchange [Reuses operator id: 6] -Output : ArrayBuffer(key#x, max#x) +Output [2]: [key#x, max#x] (9) HashAggregate [codegen id : 3] Input: [key#x, max#x] @@ -838,11 +871,12 @@ Aggregate Attributes: [max(val#x)#x] Results: [key#x, max(val#x)#x AS max(val)#x] (10) BroadcastExchange -Input: [key#x, max(val)#x] +Input [2]: [key#x, max(val)#x] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] (11) BroadcastHashJoin [codegen id : 4] -Left keys: List(key#x) -Right keys: List(key#x) +Left keys [1]: [key#x] +Right keys [1]: [key#x] Join condition: None @@ -861,13 +895,17 @@ Execute CreateViewCommand (1) (1) Execute CreateViewCommand +Arguments: None Output: [] (2) CreateViewCommand +Arguments: `explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView (3) UnresolvedRelation +Arguments: [explain_temp1] -(4) Project +(4) Project +Arguments: ['key, 'val] -- !query From f0bcf1280dc556447b225d6519177c694efe7da8 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Sun, 16 Feb 2020 23:57:10 +0800 Subject: [PATCH 2/3] rebase and address arguments comment --- .../spark/sql/catalyst/plans/QueryPlan.scala | 10 +- .../spark/sql/execution/SparkPlan.scala | 44 ++- .../aggregate/BaseAggregateExec.scala | 15 +- .../sql-tests/results/explain.sql.out | 357 +++++++++--------- 4 files changed, 211 insertions(+), 215 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0faac665df7cf..d0dfa3c847a00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -189,12 +189,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT val codegenIdStr = getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") - val argStr = argString(SQLConf.get.maxToStringFields) + val argumentString = argString(SQLConf.get.maxToStringFields) - s""" + val result = s""" |($operatorId) $nodeName $codegenIdStr - |Arguments: ${if (argStr != null && !argStr.isEmpty) argStr else "None"} """.stripMargin + if (argumentString != null && !argumentString.isEmpty) { + s"""${result} |Arguments: $argumentString\n""".stripMargin + } else { + result + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 22501d28db90c..ba293dbd6ee3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -514,11 +514,15 @@ trait LeafExecNode extends SparkPlan { override def producedAttributes: AttributeSet = outputSet override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Arguments", argumentString)} - |${ExplainUtils.generateFieldString("Output", producedAttributes)} - """.stripMargin + val result = s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("Output", producedAttributes)} + """.stripMargin + if (argumentString != null && !argumentString.isEmpty) { + s"""${result} |Arguments: $argumentString\n""".stripMargin + } else { + s"${result}" + } } } @@ -535,11 +539,15 @@ trait UnaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = child :: Nil override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Input", child.output)} - |${ExplainUtils.generateFieldString("Arguments", argumentString)} - """.stripMargin + val result = s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("Input", child.output)} + """.stripMargin + if (argumentString != null && !argumentString.isEmpty) { + s"""${result} |Arguments: $argumentString\n""".stripMargin + } else { + s"${result}" + } } } @@ -550,11 +558,15 @@ trait BinaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Seq(left, right) override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Left output", left.output)} - |${ExplainUtils.generateFieldString("Right output", right.output)} - |${ExplainUtils.generateFieldString("Arguments", argumentString)} - """.stripMargin + val result = s""" + |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} + |${ExplainUtils.generateFieldString("Left output", left.output)} + |${ExplainUtils.generateFieldString("Right output", right.output)} + """.stripMargin + if (argumentString != null && !argumentString.isEmpty) { + s"""${result} |Arguments: $argumentString\n""".stripMargin + } else { + s"${result}" + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index 0eaa0f53fdacd..19d7263feb2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -31,18 +31,13 @@ trait BaseAggregateExec extends UnaryExecNode { def resultExpressions: Seq[NamedExpression] override def verboseStringWithOperatorId(): String = { - val inputString = child.output.mkString("[", ", ", "]") - val keyString = groupingExpressions.mkString("[", ", ", "]") - val functionString = aggregateExpressions.mkString("[", ", ", "]") - val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]") - val resultString = resultExpressions.mkString("[", ", ", "]") s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |Input: $inputString - |Keys: $keyString - |Functions: $functionString - |Aggregate Attributes: $aggregateAttributeString - |Results: $resultString + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} + |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} + |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} + |${ExplainUtils.generateFieldString("Results", resultExpressions)} """.stripMargin } } diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index c8ce4e4e9e385..aff7f5c54f499 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -73,8 +73,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -84,27 +83,27 @@ Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 2] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) Exchange Input [2]: [key#x, max(val)#x] Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] - + (9) Sort [codegen id : 3] Input [2]: [key#x, max(val)#x] Arguments: [key#x ASC NULLS FIRST], true, 0 @@ -141,8 +140,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -152,22 +150,22 @@ Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 2] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x] (8) Filter [codegen id : 2] Input [3]: [key#x, max(val)#x, max(val#x)#x] @@ -210,8 +208,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -229,8 +226,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] -Arguments: None - + (7) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -240,25 +236,24 @@ Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] (9) Union -Arguments: None (10) HashAggregate [codegen id : 3] -Input: [key#x, val#x] -Keys: [key#x, val#x] +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] -Results: [key#x, val#x] +Results [2]: [key#x, val#x] (11) Exchange Input [2]: [key#x, val#x] Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] - + (12) HashAggregate [codegen id : 4] -Input: [key#x, val#x] -Keys: [key#x, val#x] +Input [2]: [key#x, val#x] +Keys [2]: [key#x, val#x] Functions: [] Aggregate Attributes: [] -Results: [key#x, val#x] +Results [2]: [key#x, val#x] -- !query @@ -292,8 +287,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -311,8 +305,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -324,7 +317,7 @@ Input [2]: [key#x, val#x] (9) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (10) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -359,8 +352,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] -Arguments: None - + (3) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true @@ -370,8 +362,7 @@ ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (5) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -383,7 +374,7 @@ Input [2]: [key#x, val#x] (7) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (8) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -420,8 +411,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) @@ -451,8 +441,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) @@ -462,22 +451,22 @@ Output [1]: [key#x] Input [2]: [key#x, val#x] (9) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (10) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (11) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (18) @@ -498,8 +487,7 @@ ReadSchema: struct (13) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (14) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -509,22 +497,22 @@ Output [1]: [key#x] Input [2]: [key#x, val#x] (16) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (17) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (18) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] -- !query @@ -555,8 +543,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) @@ -582,8 +569,7 @@ ReadSchema: struct (5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (6) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -593,22 +579,22 @@ Output [1]: [key#x] Input [2]: [key#x, val#x] (8) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_max(key#x)] -Aggregate Attributes: [max#x] -Results: [max#x] +Functions [1]: [partial_max(key#x)] +Aggregate Attributes [1]: [max#x] +Results [1]: [max#x] (9) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (10) HashAggregate [codegen id : 2] -Input: [max#x] +Input [1]: [max#x] Keys: [] -Functions: [max(key#x)] -Aggregate Attributes: [max(key#x)#x] -Results: [max(key#x)#x AS max(key)#x] +Functions [1]: [max(key#x)] +Aggregate Attributes [1]: [max(key#x)#x] +Results [1]: [max(key#x)#x AS max(key)#x] Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (17) @@ -629,8 +615,7 @@ ReadSchema: struct (12) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (13) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -640,21 +625,22 @@ Output [1]: [key#x] Input [2]: [key#x, val#x] (15) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes: [sum#x, count#xL] -Results: [sum#x, count#xL] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] (16) Exchange -Input: [sum#x, count#xL] - +Input [2]: [sum#x, count#xL] +Arguments: SinglePartition, true, [id=#x] + (17) HashAggregate [codegen id : 2] -Input: [sum#x, count#xL] +Input [2]: [sum#x, count#xL] Keys: [] -Functions: [avg(cast(key#x as bigint))] -Aggregate Attributes: [avg(cast(key#x as bigint))#x] -Results: [avg(cast(key#x as bigint))#x AS avg(key)#x] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] -- !query @@ -678,8 +664,7 @@ ReadSchema: struct<> (2) ColumnarToRow [codegen id : 1] Input: [] -Arguments: None - + (3) Project [codegen id : 1] Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] Input: [] @@ -702,25 +687,24 @@ ReadSchema: struct (5) ColumnarToRow [codegen id : 1] Input [1]: [key#x] -Arguments: None - + (6) HashAggregate [codegen id : 1] -Input: [key#x] +Input [1]: [key#x] Keys: [] -Functions: [partial_avg(cast(key#x as bigint))] -Aggregate Attributes: [sum#x, count#xL] -Results: [sum#x, count#xL] +Functions [1]: [partial_avg(cast(key#x as bigint))] +Aggregate Attributes [2]: [sum#x, count#xL] +Results [2]: [sum#x, count#xL] (7) Exchange Input [2]: [sum#x, count#xL] Arguments: SinglePartition, true, [id=#x] - + (8) HashAggregate [codegen id : 2] -Input: [sum#x, count#xL] +Input [2]: [sum#x, count#xL] Keys: [] -Functions: [avg(cast(key#x as bigint))] -Aggregate Attributes: [avg(cast(key#x as bigint))#x] -Results: [avg(cast(key#x as bigint))#x AS avg(key)#x] +Functions [1]: [avg(cast(key#x as bigint))] +Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x] +Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x] Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] @@ -758,8 +742,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -777,8 +760,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -790,7 +772,7 @@ Input [2]: [key#x, val#x] (9) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (10) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -832,8 +814,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] -Arguments: None - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -843,37 +824,37 @@ Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] (5) HashAggregate [codegen id : 1] -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_max(val#x)] -Aggregate Attributes: [max#x] -Results: [key#x, max#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_max(val#x)] +Aggregate Attributes [1]: [max#x] +Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 4] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) ReusedExchange [Reuses operator id: 6] Output [2]: [key#x, max#x] (9) HashAggregate [codegen id : 3] -Input: [key#x, max#x] -Keys: [key#x] -Functions: [max(val#x)] -Aggregate Attributes: [max(val#x)#x] -Results: [key#x, max(val#x)#x AS max(val)#x] +Input [2]: [key#x, max#x] +Keys [1]: [key#x] +Functions [1]: [max(val#x)] +Aggregate Attributes [1]: [max(val#x)#x] +Results [2]: [key#x, max(val#x)#x AS max(val)#x] (10) BroadcastExchange Input [2]: [key#x, max(val)#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (11) BroadcastHashJoin [codegen id : 4] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -895,15 +876,14 @@ Execute CreateViewCommand (1) (1) Execute CreateViewCommand -Arguments: None Output: [] - + (2) CreateViewCommand Arguments: `explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView - + (3) UnresolvedRelation Arguments: [explain_temp1] - + (4) Project Arguments: ['key, 'val] @@ -926,30 +906,31 @@ struct (1) Scan parquet default.explain_temp1 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp1] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) HashAggregate -Input: [key#x, val#x] +Input [2]: [key#x, val#x] Keys: [] -Functions: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] -Aggregate Attributes: [count#xL, sum#xL, count#xL] -Results: [count#xL, sum#xL, count#xL] +Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))] +Aggregate Attributes [3]: [count#xL, sum#xL, count#xL] +Results [3]: [count#xL, sum#xL, count#xL] (4) Exchange -Input: [count#xL, sum#xL, count#xL] - +Input [3]: [count#xL, sum#xL, count#xL] +Arguments: SinglePartition, true, [id=#x] + (5) HashAggregate [codegen id : 2] -Input: [count#xL, sum#xL, count#xL] +Input [3]: [count#xL, sum#xL, count#xL] Keys: [] -Functions: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] -Aggregate Attributes: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] -Results: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] +Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)] +Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL] +Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL] -- !query @@ -969,30 +950,31 @@ ObjectHashAggregate (5) (1) Scan parquet default.explain_temp4 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) ObjectHashAggregate -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_collect_set(val#x, 0, 0)] -Aggregate Attributes: [buf#x] -Results: [key#x, buf#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [buf#x] +Results [2]: [key#x, buf#x] (4) Exchange -Input: [key#x, buf#x] - +Input [2]: [key#x, buf#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (5) ObjectHashAggregate -Input: [key#x, buf#x] -Keys: [key#x] -Functions: [collect_set(val#x, 0, 0)] -Aggregate Attributes: [collect_set(val#x, 0, 0)#x] -Results: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] +Input [2]: [key#x, buf#x] +Keys [1]: [key#x] +Functions [1]: [collect_set(val#x, 0, 0)] +Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x] +Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x] -- !query @@ -1014,36 +996,39 @@ SortAggregate (7) (1) Scan parquet default.explain_temp4 -Output: [key#x, val#x] +Output [2]: [key#x, val#x] Batched: true Location [not included in comparison]/{warehouse_dir}/explain_temp4] ReadSchema: struct (2) ColumnarToRow [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] + (3) Sort [codegen id : 1] -Input: [key#x, val#x] - +Input [2]: [key#x, val#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + (4) SortAggregate -Input: [key#x, val#x] -Keys: [key#x] -Functions: [partial_min(val#x)] -Aggregate Attributes: [min#x] -Results: [key#x, min#x] +Input [2]: [key#x, val#x] +Keys [1]: [key#x] +Functions [1]: [partial_min(val#x)] +Aggregate Attributes [1]: [min#x] +Results [2]: [key#x, min#x] (5) Exchange -Input: [key#x, min#x] - +Input [2]: [key#x, min#x] +Arguments: hashpartitioning(key#x, 4), true, [id=#x] + (6) Sort [codegen id : 2] -Input: [key#x, min#x] - +Input [2]: [key#x, min#x] +Arguments: [key#x ASC NULLS FIRST], false, 0 + (7) SortAggregate -Input: [key#x, min#x] -Keys: [key#x] -Functions: [min(val#x)] -Aggregate Attributes: [min(val#x)#x] -Results: [key#x, min(val#x)#x AS min(val)#x] +Input [2]: [key#x, min#x] +Keys [1]: [key#x] +Functions [1]: [min(val#x)] +Aggregate Attributes [1]: [min(val#x)#x] +Results [2]: [key#x, min(val#x)#x AS min(val)#x] -- !query From 1290cd523d6fdead5399923ea4acac450a5c2175 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Fri, 21 Feb 2020 11:03:59 +0800 Subject: [PATCH 3/3] Address comments and fix PySpark test --- python/pyspark/sql/dataframe.py | 3 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 16 ++-- .../spark/sql/execution/ExplainUtils.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 64 ++++++++----- .../sql-tests/results/explain.sql.out | 90 +++++++++---------- 5 files changed, 99 insertions(+), 76 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2432b8127840b..44cb264714482 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -284,7 +284,8 @@ def explain(self, extended=None, mode=None): == Physical Plan == * Scan ExistingRDD (1) (1) Scan ExistingRDD [codegen id : 1] - Output: [age#0, name#1] + Output [2]: [age#0, name#1] + ... .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index d0dfa3c847a00..12482667efa0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag} -import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -189,15 +188,18 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT val codegenIdStr = getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("") val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown") + val baseStr = s"($operatorId) $nodeName $codegenIdStr" val argumentString = argString(SQLConf.get.maxToStringFields) - val result = s""" - |($operatorId) $nodeName $codegenIdStr - """.stripMargin - if (argumentString != null && !argumentString.isEmpty) { - s"""${result} |Arguments: $argumentString\n""".stripMargin + if (argumentString.nonEmpty) { + s""" + |$baseStr + |Arguments: $argumentString + """.stripMargin } else { - result + s""" + |$baseStr + """.stripMargin } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 8561cb18c8601..5d4309357895b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -189,7 +189,7 @@ object ExplainUtils { case iter: Iterable[_] => s"${fieldName} [${iter.size}]: ${iter.mkString("[", ", ", "]")}" case str: String if (str == null || str.isEmpty) => s"${fieldName}: None" case str: String => s"${fieldName}: ${str}" - case _ => s"${fieldName}: Unknown" + case _ => throw new IllegalArgumentException(s"Unsupported type for argument values: $values") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ba293dbd6ee3f..f5bb554682eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -514,14 +514,20 @@ trait LeafExecNode extends SparkPlan { override def producedAttributes: AttributeSet = outputSet override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - val result = s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Output", producedAttributes)} - """.stripMargin - if (argumentString != null && !argumentString.isEmpty) { - s"""${result} |Arguments: $argumentString\n""".stripMargin + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val outputStr = s"${ExplainUtils.generateFieldString("Output", producedAttributes)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$outputStr + |Arguments: $argumentString + """.stripMargin } else { - s"${result}" + s""" + |$baseStr + |$outputStr + """.stripMargin } } } @@ -539,14 +545,20 @@ trait UnaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = child :: Nil override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - val result = s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Input", child.output)} - """.stripMargin - if (argumentString != null && !argumentString.isEmpty) { - s"""${result} |Arguments: $argumentString\n""".stripMargin + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val inputStr = s"${ExplainUtils.generateFieldString("Input", child.output)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$inputStr + |Arguments: $argumentString + """.stripMargin } else { - s"${result}" + s""" + |$baseStr + |$inputStr + """.stripMargin } } } @@ -558,15 +570,23 @@ trait BinaryExecNode extends SparkPlan { override final def children: Seq[SparkPlan] = Seq(left, right) override def verboseStringWithOperatorId(): String = { val argumentString = argString(SQLConf.get.maxToStringFields) - val result = s""" - |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} - |${ExplainUtils.generateFieldString("Left output", left.output)} - |${ExplainUtils.generateFieldString("Right output", right.output)} - """.stripMargin - if (argumentString != null && !argumentString.isEmpty) { - s"""${result} |Arguments: $argumentString\n""".stripMargin + val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}" + val leftOutputStr = s"${ExplainUtils.generateFieldString("Left output", left.output)}" + val rightOutputStr = s"${ExplainUtils.generateFieldString("Right output", right.output)}" + + if (argumentString.nonEmpty) { + s""" + |$baseStr + |$leftOutputStr + |$rightOutputStr + |Arguments: $argumentString + """.stripMargin } else { - s"${result}" + s""" + |$baseStr + |$leftOutputStr + |$rightOutputStr + """.stripMargin } } } diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index aff7f5c54f499..40b26e6fb64ce 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -73,7 +73,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -92,7 +92,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] Keys [1]: [key#x] @@ -103,7 +103,7 @@ Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) Exchange Input [2]: [key#x, max(val)#x] Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] - + (9) Sort [codegen id : 3] Input [2]: [key#x, max(val)#x] Arguments: [key#x ASC NULLS FIRST], true, 0 @@ -140,7 +140,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -159,7 +159,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] Keys [1]: [key#x] @@ -208,7 +208,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -226,7 +226,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] - + (7) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 0)) @@ -236,7 +236,7 @@ Output [2]: [key#x, val#x] Input [2]: [key#x, val#x] (9) Union - + (10) HashAggregate [codegen id : 3] Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] @@ -247,7 +247,7 @@ Results [2]: [key#x, val#x] (11) Exchange Input [2]: [key#x, val#x] Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] - + (12) HashAggregate [codegen id : 4] Input [2]: [key#x, val#x] Keys [2]: [key#x, val#x] @@ -287,7 +287,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -305,7 +305,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -317,7 +317,7 @@ Input [2]: [key#x, val#x] (9) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (10) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -352,7 +352,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] - + (3) Scan parquet default.explain_temp2 Output [2]: [key#x, val#x] Batched: true @@ -362,7 +362,7 @@ ReadSchema: struct (4) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (5) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : isnotnull(key#x) @@ -374,7 +374,7 @@ Input [2]: [key#x, val#x] (7) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (8) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -411,7 +411,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) @@ -441,7 +441,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) @@ -460,7 +460,7 @@ Results [1]: [max#x] (10) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (11) HashAggregate [codegen id : 2] Input [1]: [max#x] Keys: [] @@ -487,7 +487,7 @@ ReadSchema: struct (13) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (14) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -506,7 +506,7 @@ Results [1]: [max#x] (17) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (18) HashAggregate [codegen id : 2] Input [1]: [max#x] Keys: [] @@ -543,7 +543,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) @@ -569,7 +569,7 @@ ReadSchema: struct (5) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (6) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -588,7 +588,7 @@ Results [1]: [max#x] (9) Exchange Input [1]: [max#x] Arguments: SinglePartition, true, [id=#x] - + (10) HashAggregate [codegen id : 2] Input [1]: [max#x] Keys: [] @@ -615,7 +615,7 @@ ReadSchema: struct (12) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (13) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(val#x) AND (val#x > 0)) @@ -634,7 +634,7 @@ Results [2]: [sum#x, count#xL] (16) Exchange Input [2]: [sum#x, count#xL] Arguments: SinglePartition, true, [id=#x] - + (17) HashAggregate [codegen id : 2] Input [2]: [sum#x, count#xL] Keys: [] @@ -664,7 +664,7 @@ ReadSchema: struct<> (2) ColumnarToRow [codegen id : 1] Input: [] - + (3) Project [codegen id : 1] Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x] Input: [] @@ -687,7 +687,7 @@ ReadSchema: struct (5) ColumnarToRow [codegen id : 1] Input [1]: [key#x] - + (6) HashAggregate [codegen id : 1] Input [1]: [key#x] Keys: [] @@ -698,7 +698,7 @@ Results [2]: [sum#x, count#xL] (7) Exchange Input [2]: [sum#x, count#xL] Arguments: SinglePartition, true, [id=#x] - + (8) HashAggregate [codegen id : 2] Input [2]: [sum#x, count#xL] Keys: [] @@ -742,7 +742,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 2] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 2] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -760,7 +760,7 @@ ReadSchema: struct (6) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (7) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -772,7 +772,7 @@ Input [2]: [key#x, val#x] (9) BroadcastExchange Input [2]: [key#x, val#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (10) BroadcastHashJoin [codegen id : 2] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -814,7 +814,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] Condition : (isnotnull(key#x) AND (key#x > 10)) @@ -833,7 +833,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (7) HashAggregate [codegen id : 4] Input [2]: [key#x, max#x] Keys [1]: [key#x] @@ -854,7 +854,7 @@ Results [2]: [key#x, max(val#x)#x AS max(val)#x] (10) BroadcastExchange Input [2]: [key#x, max(val)#x] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x] - + (11) BroadcastHashJoin [codegen id : 4] Left keys [1]: [key#x] Right keys [1]: [key#x] @@ -877,13 +877,13 @@ Execute CreateViewCommand (1) (1) Execute CreateViewCommand Output: [] - + (2) CreateViewCommand Arguments: `explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView - + (3) UnresolvedRelation Arguments: [explain_temp1] - + (4) Project Arguments: ['key, 'val] @@ -913,7 +913,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) HashAggregate Input [2]: [key#x, val#x] Keys: [] @@ -924,7 +924,7 @@ Results [3]: [count#xL, sum#xL, count#xL] (4) Exchange Input [3]: [count#xL, sum#xL, count#xL] Arguments: SinglePartition, true, [id=#x] - + (5) HashAggregate [codegen id : 2] Input [3]: [count#xL, sum#xL, count#xL] Keys: [] @@ -957,7 +957,7 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) ObjectHashAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] @@ -968,7 +968,7 @@ Results [2]: [key#x, buf#x] (4) Exchange Input [2]: [key#x, buf#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (5) ObjectHashAggregate Input [2]: [key#x, buf#x] Keys [1]: [key#x] @@ -1003,11 +1003,11 @@ ReadSchema: struct (2) ColumnarToRow [codegen id : 1] Input [2]: [key#x, val#x] - + (3) Sort [codegen id : 1] Input [2]: [key#x, val#x] Arguments: [key#x ASC NULLS FIRST], false, 0 - + (4) SortAggregate Input [2]: [key#x, val#x] Keys [1]: [key#x] @@ -1018,11 +1018,11 @@ Results [2]: [key#x, min#x] (5) Exchange Input [2]: [key#x, min#x] Arguments: hashpartitioning(key#x, 4), true, [id=#x] - + (6) Sort [codegen id : 2] Input [2]: [key#x, min#x] Arguments: [key#x ASC NULLS FIRST], false, 0 - + (7) SortAggregate Input [2]: [key#x, min#x] Keys [1]: [key#x]