Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,22 @@ case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this warrants a comment in the javadoc for this class why we have stackTrace and fullStackTrace and what the difference is. Just a simple one line explanation would do.

metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)

private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
}

override def toErrorString: String =
if (fullStackTrace == null) {
// fullStackTrace is added in 1.2.0
// If fullStackTrace is null, use the old error string for backward compatibility
Utils.exceptionString(className, description, stackTrace)
} else {
fullStackTrace
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[spark] class Executor(
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// Don't forcibly exit unless the exception was inherently fatal, to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
markStageAsFinished(failedStage, Some(failureMessage))
runningStages -= failedStage
}

Expand Down Expand Up @@ -1094,7 +1094,7 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
}

case ExceptionFailure(className, description, stackTrace, metrics) =>
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

case TaskResultLost =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,30 @@ private[spark] class FetchFailedException(
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String)
extends Exception(message) {
message: String,
cause: Throwable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you make this default to cause: Throwable = null then you can get rid of the first constructor

extends Exception(message, cause) {

def this(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String) {
this(bmAddress, shuffleId, mapId, reduceId, message, null)
}

def this(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
cause: Throwable) {
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.{CompletionIterator, Utils}
import org.apache.spark.util.CompletionIterator

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
Expand Down
32 changes: 29 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
Expand Down Expand Up @@ -409,13 +411,37 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{diskBytesSpilledReadable}
</td>
}}
<td>
{errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
</td>
{errorMessageCell(errorMessage)}
</tr>
}
}

private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
val error = errorMessage.getOrElse("")
val isMultiline = error.indexOf('\n') >= 0
// Display the first line by default
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
error.substring(0, error.indexOf('\n'))
} else {
error
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{error}</pre>
</div>
// scalastyle:on
} else {
""
}
<td>{errorSummary}{details}</td>
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime = {
if (info.gettingResultTime > 0) {
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.xml.Text

import java.util.Date

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -195,7 +197,29 @@ private[ui] class FailedStageTable(

override protected def stageRow(s: StageInfo): Seq[Node] = {
val basicColumns = super.stageRow(s)
val failureReason = <td valign="middle"><pre>{s.failureReason.getOrElse("")}</pre></td>
basicColumns ++ failureReason
val failureReason = s.failureReason.getOrElse("")
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
</div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you indent this entire if case, just like how you did in StagePage?

// scalastyle:on
} else {
""
}
val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
basicColumns ++ failureReasonHtml
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ private[spark] object JsonProtocol {
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
Expand Down Expand Up @@ -637,8 +638,10 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
map(_.extract[String]).orNull
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
new ExceptionFailure(className, description, stackTrace, metrics)
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,21 @@ private[spark] object Utils extends Logging {

/** Return a nice string representation of the exception, including the stack trace. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe we should augment this to say that this stack trace recursively includes the causes of the exception

def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
if (e == null) {
""
} else {
val stringWriter = new StringWriter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment here that says something like

// Use e.printStackTrace here because e.getStackTrace doesn't include the cause

e.printStackTrace(new PrintWriter(stringWriter))
stringWriter.toString
}
}

/** Return a nice string representation of the exception, including the stack trace. */
/**
* Return a nice string representation of the exception, including the stack trace.
* It's only used for backward compatibility.
* Note: deprecated because it does not include the exception's cause.
*/
@deprecated("Use exceptionString(Throwable) instead", "1.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just remove this since this is not public API. No need to deprecate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind you use this somewhere else. I'm still not sure if we should deprecate this because now we'll have warning messages when we build Spark. Maybe it's sufficient to just mention in a comment that the stack trace here does not include the cause of the exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this one into ExceptionFailure because it's only used by ExceptionFailure

def exceptionString(
className: String,
description: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0, "ignored"),
new ExceptionFailure("Exception", "description", null, None),
ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure("0"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class JsonProtocolSuite extends FunSuite {
// TaskEndReason
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
val exceptionFailure = new ExceptionFailure(exception, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
Expand All @@ -127,6 +127,13 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}

test("ExceptionFailure backward compatibility") {
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("StageInfo backward compatibility") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
val newJson = JsonProtocol.stageInfoToJson(info)
Expand Down Expand Up @@ -402,6 +409,7 @@ class JsonProtocolSuite extends FunSuite {
assert(r1.className === r2.className)
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assert(r1.fullStackTrace === r2.fullStackTrace)
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
Expand Down