Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ private[spark] object ToolTips {
dynamic allocation is enabled. The number of granted executors may exceed the limit
ephemerally when executors are being killed.
"""

val SQL_TEXT =
"""Shows 140 characters by default. Click "+more" to see more. Long texts are truncated to 1000
|characters. Left blank when the query was not issued by SQL."""
.stripMargin.replaceAll("\n", " ")
}
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ private[spark] object UIUtils extends Logging {
id: Option[String] = None,
headerClasses: Seq[String] = Seq.empty,
stripeRowsWithCss: Boolean = true,
sortable: Boolean = true): Seq[Node] = {
sortable: Boolean = true,
// If the tool tip is defined, Some(toolTipText, toolTipPosition), otherwise None.
headerToolTips: Seq[Option[(String, String)]] = Seq.empty): Seq[Node] = {

val listingTableClass = {
val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
Expand All @@ -317,6 +319,14 @@ private[spark] object UIUtils extends Logging {
}
}

def getToolTip(index: Int): Option[(String, String)] = {
if (index < headerToolTips.size) {
headerToolTips(index)
} else {
None
}
}

val newlinesInHeader = headers.exists(_.contains("\n"))
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
Expand All @@ -330,7 +340,16 @@ private[spark] object UIUtils extends Logging {

val headerRow: Seq[Node] = {
headers.view.zipWithIndex.map { x =>
<th width={colWidthAttr} class={getClass(x._2)}>{getHeaderContent(x._1)}</th>
val toolTipOption = getToolTip(x._2)
if (toolTipOption.isEmpty) {
<th width={colWidthAttr} class={getClass(x._2)}>{getHeaderContent(x._1)}</th>
} else {
val toolTip = toolTipOption.get
// scalastyle:off line.size.limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need this, scala can handle line wrapping for Node return values

<th width={colWidthAttr} class={getClass(x._2)} data-toggle="tooltip" title={toolTip._1} data-placement={toolTip._2}>{getHeaderContent(x._1)}</th>
// scalastyle:on line.size.limit
}

}
}
<table class={listingTableClass} id={id.map(Text.apply)}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,29 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
}

/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
override def parsePlan(sqlText: String): LogicalPlan = {
val logicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
// Record the original sql text in the top logical plan for checking in the web UI.
// Truncate the text to avoid downing browsers or web UI servers by running out of memory.
val maxLength = 1000
val suffix = " ... (truncated)"
val truncateLength = maxLength - suffix.length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? Given the reasoning behind the max length I don't see a reason to include extra lines of code to include the suffix as part of the max length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way is okay. Here, I am considering keeping the text displayed (including suffix) less than 1000 chars.

val truncatedSqlText = {
if (sqlText.length <= maxLength) {
sqlText
} else {
sqlText.substring(0, truncateLength) + suffix
}
}
logicalPlan.sqlText = Some(truncatedSqlText)
logicalPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution in this PR looks intrusive to me. If we really want to store the original sql text, we can add it into the QueryExecution. The value can be initialized when we build the QueryExecution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. I agree that QueryExecution is a better place for original SQL text. I have updated my code accordingly. Could you please have a look?

}

/** Get the builder (visitor) which converts a ParseTree into an AST. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
*/
def refresh(): Unit = children.foreach(_.refresh())

// Record the original sql text in the top logical plan for checking in the web UI.
var sqlText: Option[String] = None
Copy link
Member

@gatorsmile gatorsmile May 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using var for this PR should be avoided.

}

/**
Expand Down
5 changes: 4 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2945,7 +2945,10 @@ class Dataset[T] private[sql](

/** A convenient function to wrap a logical plan and produce a Dataset. */
@inline private def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = {
Dataset(sparkSession, logicalPlan)
val dataset: Dataset[U] = Dataset(sparkSession, logicalPlan)
// Copy the original sql text for checking in the web UI.
dataset.logicalPlan.sqlText = queryExecution.logical.sqlText
dataset
}

/** A convenient function to wrap a set based logical plan and produce a Dataset. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object SQLExecution {

sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), queryExecution.logical.sqlText,
System.currentTimeMillis()))
try {
body
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.xml.Node
import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.internal.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}

private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {

Expand Down Expand Up @@ -60,6 +60,10 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
function clickDetail(details) {{
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
}}
function clickMore(details) {{
details.parentNode.querySelector('.sql-abstract').classList.toggle('collapsed')
details.parentNode.querySelector('.sql-full').classList.toggle('collapsed')
}}
</script>
UIUtils.headerSparkPage("SQL", content, parent, Some(5000))
}
Expand All @@ -83,10 +87,13 @@ private[ui] abstract class ExecutionTable(

protected def header: Seq[String]

protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): Seq[Node] = {
protected def row(currentTime: Long, executionUIData: SQLExecutionUIData, showSqlText: Boolean)
: Seq[Node] = {
val submissionTime = executionUIData.submissionTime
val duration = executionUIData.completionTime.getOrElse(currentTime) - submissionTime

val sqlText = executionUIData.sqlText.getOrElse("")

val runningJobs = executionUIData.runningJobs.map { jobId =>
<a href={jobURL(jobId)}>{jobId.toString}</a><br/>
}
Expand Down Expand Up @@ -124,6 +131,11 @@ private[ui] abstract class ExecutionTable(
{failedJobs}
</td>
}}
{if (showSqlText) {
<td>
{sqlTextCell(sqlText)}
</td>
}}
</tr>
}

Expand All @@ -146,11 +158,43 @@ private[ui] abstract class ExecutionTable(
<div>{desc} {details}</div>
}

private def sqlTextCell(sqlText: String): Seq[Node] = {
// Only show a limited number of characters of sqlText by default when it is too long
val maxLength = 140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a button to copy full query text into clipboard?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? The text can be easily highlighted and copied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. for very long query that wouldn't fit as per this comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres a "+more" button that expands to show the full query

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I understand you now, if we end up truncating the text at 1000 chars as proposed then yes this may be a useful feature, but if the text is available to copy wouldn't the size issue still exist?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean that it could be too long for clipboard? I don't think that would be an issue...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @felixcheung. When I am looking at the discussions again today, I find I am more clear about your suggestions. But actually, truncating very long queries is not because they are not visually pleasant to be displayed on the page, but it may take too many resources to store and transfer such a long unlimited string. So the queries longer than 1000 chars are truncated after parsing, which will never be recovered. So the idea of clipboard may not be used to resolve it.

But still, I believe the clipboard is a good idea. I do think we could extend the limit to 10k chars instead of 1k if we add a clipboard later.


if (sqlText.length <= maxLength) {
<div>{sqlText}</div>
} else {
val sqlAbstractText = sqlText.substring(0, maxLength) + " ..."
<div>
<div class="stage-details sql-abstract">
{sqlAbstractText}
</div>
<div class="stage-details sql-full collapsed">
{sqlText}
</div>
<span onclick="clickMore(this)" class="expand-details">
+more
</span>
</div>
}
}

def toNodeSeq: Seq[Node] = {
val showSqlText = executionUIDatas.exists(_.sqlText.isDefined)
val headerFull = header ++ {if (showSqlText) Seq("SQL Text") else Seq.empty}
val sqlTextToolTip = {if (showSqlText) {
Seq(Some(ToolTips.SQL_TEXT, "top"))
} else {
Seq.empty
}}
val headerToolTips: Seq[Option[(String, String)]] = header.map(_ => None) ++ sqlTextToolTip

<div>
<h4>{tableName}</h4>
{UIUtils.listingTable[SQLExecutionUIData](
header, row(currentTime, _), executionUIDatas, id = Some(tableId))}
headerFull, row(currentTime, _, showSqlText), executionUIDatas, id = Some(tableId),
headerToolTips = headerToolTips)}
</div>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class SparkListenerSQLExecutionStart(
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
sqlText: Option[String],
time: Long)
extends SparkListenerEvent

Expand Down Expand Up @@ -268,7 +269,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
physicalPlanDescription, sparkPlanInfo, sqlText, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
Expand All @@ -280,6 +281,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
physicalPlanDescription,
physicalPlanGraph,
sqlPlanMetrics.toMap,
sqlText,
time)
synchronized {
activeExecutions(executionId) = executionUIData
Expand Down Expand Up @@ -428,6 +430,7 @@ private[ui] class SQLExecutionUIData(
val physicalPlanDescription: String,
val physicalPlanGraph: SparkPlanGraph,
val accumulatorMetrics: Map[Long, SQLPlanMetric],
val sqlText: Option[String],
val submissionTime: Long) {

var completionTime: Option[Long] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))

val executionUIData = listener.executionIdToData(0)
Expand Down Expand Up @@ -259,6 +260,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -289,6 +291,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -330,6 +333,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
None,
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
Expand Down Expand Up @@ -369,7 +373,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
// These are largely just boilerplate unrelated to what we're trying to test.
val df = createTestDataFrame
val executionStart = SparkListenerSQLExecutionStart(
0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), None, 0)
val stageInfo = createStageInfo(0, 0)
val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
Expand Down