Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
35 changes: 34 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,48 @@ case class FetchFailed(
* :: DeveloperApi ::
* Task failed due to a runtime exception. This is the most common failure case and also captures
* user program exceptions.
*
* `stackTrace` contains the stack trace of the exception itself. It still exists for backward
* compatibility. It's better that using `this(e: Throwable, metrics: Option[TaskMetrics])` to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It's better to use", but I'll fix this when I merge it. (No action needed on your part)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

* create `ExceptionFailure` as it will handle the backward compatibility properly.
*
* `fullStackTrace` is a better representation of the stack trace because it contains the whole
* stack trace including the exception and its causes
*/
@DeveloperApi
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this warrants a comment in the javadoc for this class why we have stackTrace and fullStackTrace and what the difference is. Just a simple one line explanation would do.

metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)

private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
}

override def toErrorString: String =
if (fullStackTrace == null) {
// fullStackTrace is added in 1.2.0
// If fullStackTrace is null, use the old error string for backward compatibility
exceptionString(className, description, stackTrace)
} else {
fullStackTrace
}

/**
* Return a nice string representation of the exception, including the stack trace.
* Note: It does not include the exception's causes, and is only used for backward compatibility.
*/
private def exceptionString(
className: String,
description: String,
stackTrace: Array[StackTraceElement]): String = {
val desc = if (description == null) "" else description
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
s"$className: $desc\n$st"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[spark] class Executor(
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// Don't forcibly exit unless the exception was inherently fatal, to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
markStageAsFinished(failedStage, Some(failureMessage))
runningStages -= failedStage
}

Expand Down Expand Up @@ -1094,7 +1094,7 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
}

case ExceptionFailure(className, description, stackTrace, metrics) =>
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

case TaskResultLost =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ private[spark] class FetchFailedException(
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String)
extends Exception(message) {
message: String,
cause: Throwable = null)
extends Exception(message, cause) {

def this(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
cause: Throwable) {
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.{CompletionIterator, Utils}
import org.apache.spark.util.CompletionIterator

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
Expand Down
32 changes: 29 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
Expand Down Expand Up @@ -409,13 +411,37 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{diskBytesSpilledReadable}
</td>
}}
<td>
{errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
</td>
{errorMessageCell(errorMessage)}
</tr>
}
}

private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
val error = errorMessage.getOrElse("")
val isMultiline = error.indexOf('\n') >= 0
// Display the first line by default
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
error.substring(0, error.indexOf('\n'))
} else {
error
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{error}</pre>
</div>
// scalastyle:on
} else {
""
}
<td>{errorSummary}{details}</td>
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime = {
if (info.gettingResultTime > 0) {
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.xml.Text

import java.util.Date

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -195,7 +197,29 @@ private[ui] class FailedStageTable(

override protected def stageRow(s: StageInfo): Seq[Node] = {
val basicColumns = super.stageRow(s)
val failureReason = <td valign="middle"><pre>{s.failureReason.getOrElse("")}</pre></td>
basicColumns ++ failureReason
val failureReason = s.failureReason.getOrElse("")
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
</div>
// scalastyle:on
} else {
""
}
val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
basicColumns ++ failureReasonHtml
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ private[spark] object JsonProtocol {
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
Expand Down Expand Up @@ -637,8 +638,10 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
map(_.extract[String]).orNull
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
new ExceptionFailure(className, description, stackTrace, metrics)
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1596,19 +1596,19 @@ private[spark] object Utils extends Logging {
.orNull
}

/** Return a nice string representation of the exception, including the stack trace. */
/**
* Return a nice string representation of the exception. It will call "printStackTrace" to
* recursively generate the stack trace including the exception and its causes.
*/
def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(
className: String,
description: String,
stackTrace: Array[StackTraceElement]): String = {
val desc = if (description == null) "" else description
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
s"$className: $desc\n$st"
if (e == null) {
""
} else {
// Use e.printStackTrace here because e.getStackTrace doesn't include the cause
val stringWriter = new StringWriter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment here that says something like

// Use e.printStackTrace here because e.getStackTrace doesn't include the cause

e.printStackTrace(new PrintWriter(stringWriter))
stringWriter.toString
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0, "ignored"),
new ExceptionFailure("Exception", "description", null, None),
ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure("0"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class JsonProtocolSuite extends FunSuite {
// TaskEndReason
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
val exceptionFailure = new ExceptionFailure(exception, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
Expand All @@ -127,6 +127,13 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}

test("ExceptionFailure backward compatibility") {
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("StageInfo backward compatibility") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
val newJson = JsonProtocol.stageInfoToJson(info)
Expand Down Expand Up @@ -402,6 +409,7 @@ class JsonProtocolSuite extends FunSuite {
assert(r1.className === r2.className)
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assert(r1.fullStackTrace === r2.fullStackTrace)
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
Expand Down