Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
26 changes: 24 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,32 @@ case class FetchFailed(
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
stackTrace: Array[StackTraceElement], // backwards compatibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm this comment doesn't make sense here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah -- we do need @pwendell to sign off on the fact that we broke compatibility in the prior patches for ExecutorLostFailure and FetchFailed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will hold back on reviewing this until we verify that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to add new fields to these case classes. It only breaks compatibility for matching. It make sense to change these to not be case classes in the future so that people don't try to match them (if we are going to break most uses of matching in the future).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If adding new fields is fine, I would like to add a fullStackTrace like this:

case class ExceptionFailure(
    className: String,
    description: String,
    stackTrace: Array[StackTraceElement],
    fullStackTrace: String,
    metrics: Option[TaskMetrics])
  extends TaskFailedReason {

instead of adding a var fullStackTrace. What do you think? @aarondav

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing To be clear, adding new fields in the constructor actually breaks compatibility for case classes, because you can do things like

failure match { 
  case ExceptionFailure(className, description, stackTrace, metrics) =>
}

which would fail to compile when you add a new parameter.

However, Patrick's point was that this behavior makes extending these classes very difficult, so we should make a one-time breaking change where we get rid of the case-class-ness, and just use regular classes, where you can add new fields arbitrarily. You can even remove fields if you provide a def which fakes them.

In this patch, let's go ahead and break compatibility by adding a new field as you said. Let's additionally file a JIRA to make these normal classes instead of case classes in a later patch, and, if possible, try to get both that and this into 1.2.0 to just embrace the compatibility issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished my change. Also opened a JIRA for changing case classes: https://issues.apache.org/jira/browse/SPARK-4265

metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)

/** The stack trace message with a full (recursive) stack trace. */
private var fullStackTrace: String = null

private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
this(e.getClass.getName, e.getMessage, e.getStackTrace, metrics)
fullStackTrace = Utils.exceptionString(e)
}

private[spark] def getFullStackTrace: String = fullStackTrace

private[spark] def setFullStackTrace(fullStackTrace: String): this.type = {
this.fullStackTrace = fullStackTrace
this
}

override def toErrorString: String =
if (fullStackTrace == null) {
Utils.exceptionString(className, description, stackTrace)
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (...) {
} else {
}

fullStackTrace
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[spark] class Executor(
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

// Don't forcibly exit unless the exception was inherently fatal, to avoid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
markStageAsFinished(failedStage, Some(failureMessage))
runningStages -= failedStage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,30 @@ private[spark] class FetchFailedException(
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String)
extends Exception(message) {
message: String,
cause: Throwable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you make this default to cause: Throwable = null then you can get rid of the first constructor

extends Exception(message, cause) {

def this(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String) {
this(bmAddress, shuffleId, mapId, reduceId, message, null)
}

def this(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
cause: Throwable) {
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.{CompletionIterator, Utils}
import org.apache.spark.util.CompletionIterator

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block", e)
Expand Down
32 changes: 29 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.ui.jobs.UIData._
Expand Down Expand Up @@ -409,13 +411,37 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
{diskBytesSpilledReadable}
</td>
}}
<td>
{errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
</td>
{errorMessageCell(errorMessage)}
</tr>
}
}

private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
val error = errorMessage.getOrElse("")
val isMultiline = error.indexOf('\n') >= 0
// Display the first line by default
val errorSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
error.substring(0, error.indexOf('\n'))
} else {
error
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{error}</pre>
</div>
// scalastyle:on
} else {
""
}
<td>{errorSummary}{details}</td>
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime = {
if (info.gettingResultTime > 0) {
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.xml.Text

import java.util.Date

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -195,7 +197,29 @@ private[ui] class FailedStageTable(

override protected def stageRow(s: StageInfo): Seq[Node] = {
val basicColumns = super.stageRow(s)
val failureReason = <td valign="middle"><pre>{s.failureReason.getOrElse("")}</pre></td>
basicColumns ++ failureReason
val failureReason = s.failureReason.getOrElse("")
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
</div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you indent this entire if case, just like how you did in StagePage?

// scalastyle:on
} else {
""
}
val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
basicColumns ++ failureReasonHtml
}
}
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ private[spark] object JsonProtocol {
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.getFullStackTrace) ~
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
Expand Down Expand Up @@ -637,8 +638,11 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
map(_.extract[String]).orNull
val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
new ExceptionFailure(className, description, stackTrace, metrics)
ExceptionFailure(className, description, stackTrace, metrics).
setFullStackTrace(fullStackTrace)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,22 @@ private[spark] object Utils extends Logging {

/** Return a nice string representation of the exception, including the stack trace. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe we should augment this to say that this stack trace recursively includes the causes of the exception

def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
if (e == null) {
""
}
else {
val stringWriter = new StringWriter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment here that says something like

// Use e.printStackTrace here because e.getStackTrace doesn't include the cause

e.printStackTrace(new PrintWriter(stringWriter))
stringWriter.toString
}
}

/** Return a nice string representation of the exception, including the stack trace. */
/**
* Return a nice string representation of the exception, including the stack trace.
* It's only used for backward compatibility.
* Note: deprecated because it does not include the exception's cause.
*/
@deprecated("Use exceptionString(Throwable) instead", "1.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just remove this since this is not public API. No need to deprecate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh never mind you use this somewhere else. I'm still not sure if we should deprecate this because now we'll have warning messages when we build Spark. Maybe it's sufficient to just mention in a comment that the stack trace here does not include the cause of the exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this one into ExceptionFailure because it's only used by ExceptionFailure

def exceptionString(
className: String,
description: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class JsonProtocolSuite extends FunSuite {
// TaskEndReason
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
val exceptionFailure = new ExceptionFailure(exception, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
testTaskEndReason(fetchFailed)
Expand All @@ -127,6 +127,13 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}

test("ExceptionFailure backward compatibility") {
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
.removeField({ _._1 == "Full Stack Trace" })
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("StageInfo backward compatibility") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
val newJson = JsonProtocol.stageInfoToJson(info)
Expand Down Expand Up @@ -403,6 +410,7 @@ class JsonProtocolSuite extends FunSuite {
assert(r1.description === r2.description)
assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals)
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
assert(r1.getFullStackTrace === r2.getFullStackTrace)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
Expand Down