From 94f25667360bdcc0bb83ba4dc9f01fe9feb5fc68 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 3 Nov 2014 15:47:39 +0800 Subject: [PATCH 1/6] Change Utils.exceptionString to contain the inner exceptions and make the error information in Web UI more friendly --- .../org/apache/spark/TaskEndReason.scala | 5 ++-- .../org/apache/spark/executor/Executor.scala | 3 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/shuffle/FetchFailedException.scala | 26 ++++++++++++++-- .../hash/BlockStoreShuffleFetcher.scala | 5 ++-- .../org/apache/spark/ui/jobs/StagePage.scala | 30 +++++++++++++++++-- .../org/apache/spark/ui/jobs/StageTable.scala | 26 ++++++++++++++-- .../org/apache/spark/util/JsonProtocol.scala | 12 +++++--- .../scala/org/apache/spark/util/Utils.scala | 13 ++++++-- .../apache/spark/util/JsonProtocolSuite.scala | 22 ++++++++++++-- 10 files changed, 121 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index f45b463fb6f62..b2fcfa1d45051 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -88,10 +88,11 @@ case class FetchFailed( case class ExceptionFailure( className: String, description: String, - stackTrace: Array[StackTraceElement], + stackTrace: String, metrics: Option[TaskMetrics]) extends TaskFailedReason { - override def toErrorString: String = Utils.exceptionString(className, description, stackTrace) + + override def toErrorString: String = stackTrace } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e24a15f015e1c..2400c4876d951 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -257,7 +257,8 @@ private[spark] class Executor( m.executorRunTime = serviceTime m.jvmGCTime = gcTime - startGCTime } - val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics) + val reason = ExceptionFailure(t.getClass.getName, t.getMessage, Utils.exceptionString(t), + metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // Don't forcibly exit unless the exception was inherently fatal, to avoid diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 96114c0423a9e..6fd6c46b6024b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1063,7 +1063,7 @@ class DAGScheduler( if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage)) + markStageAsFinished(failedStage, Some(failureMessage)) runningStages -= failedStage } diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index 0c1b6f4defdb3..7a8055219fb71 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -32,10 +32,30 @@ private[spark] class FetchFailedException( shuffleId: Int, mapId: Int, reduceId: Int, - message: String) - extends Exception(message) { + message: String, + cause: Throwable) + extends Exception(message, cause) { + + def this( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int, + message: String) { + this(bmAddress, shuffleId, mapId, reduceId, message, null) + } + + def this( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int, + cause: Throwable) { + this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause) + } - def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, + Utils.exceptionString(this)) } /** diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 0d5247f4176d4..e3e7434df45b0 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.{CompletionIterator, Utils} +import org.apache.spark.util.CompletionIterator private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, - Utils.exceptionString(e)) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block", e) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 7cc03b7d333df..9580c2b9a488c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,6 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, Unparsed} +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.executor.TaskMetrics import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ @@ -409,13 +411,35 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {diskBytesSpilledReadable} }} - - {errorMessage.map { e =>
{e}
}.getOrElse("")} - + {errorMessageCell(errorMessage)} } } + private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = { + val error = errorMessage.getOrElse("") + val isMultiline = error.indexOf('\n') >= 0 + // Display the first line by default + val errorSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + error.substring(0, error.indexOf('\n')) + } else { + error + }) + val details = if (isMultiline) { + + +details + ++ + + } else { + "" + } + {errorSummary}{details} + } + private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = { val totalExecutionTime = { if (info.gettingResultTime > 0) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 4ee7f08ab47a2..5df87dad89815 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -22,6 +22,8 @@ import scala.xml.Text import java.util.Date +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils @@ -195,7 +197,27 @@ private[ui] class FailedStageTable( override protected def stageRow(s: StageInfo): Seq[Node] = { val basicColumns = super.stageRow(s) - val failureReason =
{s.failureReason.getOrElse("")}
- basicColumns ++ failureReason + val failureReason = s.failureReason.getOrElse("") + val isMultiline = failureReason.indexOf('\n') >= 0 + // Display the first line by default + val failureReasonSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + failureReason.substring(0, failureReason.indexOf('\n')) + } else { + failureReason + }) + val details = if (isMultiline) { + + +details + ++ + + } else { + "" + } + val failureReasonHtml = {failureReasonSummary}{details} + basicColumns ++ failureReasonHtml } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f7ae1f7f334de..7f96a15fd3799 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -282,11 +282,10 @@ private[spark] object JsonProtocol { ("Reduce ID" -> fetchFailed.reduceId) ~ ("Message" -> fetchFailed.message) case exceptionFailure: ExceptionFailure => - val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing) ("Class Name" -> exceptionFailure.className) ~ ("Description" -> exceptionFailure.description) ~ - ("Stack Trace" -> stackTrace) ~ + ("Full Stack Trace" -> exceptionFailure.stackTrace) ~ ("Metrics" -> metrics) case ExecutorLostFailure(executorId) => ("Executor ID" -> executorId) @@ -636,9 +635,14 @@ private[spark] object JsonProtocol { case `exceptionFailure` => val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] - val stackTrace = stackTraceFromJson(json \ "Stack Trace") + val stackTrace = Utils.jsonOption(json \ "Full Stack Trace").map(_.extract[String]). + getOrElse { + // backward compatibility + val oldStackTrace = stackTraceFromJson(json \ "Stack Trace") + Utils.exceptionString(className, description, oldStackTrace) + } val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson) - new ExceptionFailure(className, description, stackTrace, metrics) + ExceptionFailure(className, description, stackTrace, metrics) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled case `executorLostFailure` => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a33046d2040d8..c83e925fd5747 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1598,10 +1598,19 @@ private[spark] object Utils extends Logging { /** Return a nice string representation of the exception, including the stack trace. */ def exceptionString(e: Throwable): String = { - if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace) + if (e == null) "" + else { + val stringWriter = new StringWriter() + e.printStackTrace(new PrintWriter(stringWriter)) + stringWriter.toString + } } - /** Return a nice string representation of the exception, including the stack trace. */ + /** + * Return a nice string representation of the exception, including the stack trace. + * It's only used for backward compatibility. + */ + @deprecated("Use exceptionString(Throwable) instead", "1.2.0") def exceptionString( className: String, description: String, diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a91c9ddeaef36..894b6f31203f0 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -109,7 +109,8 @@ class JsonProtocolSuite extends FunSuite { // TaskEndReason val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception") - val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None) + val exceptionFailure = ExceptionFailure("To be", "or not to be", + Utils.exceptionString(exception), None) testTaskEndReason(Success) testTaskEndReason(Resubmitted) testTaskEndReason(fetchFailed) @@ -127,6 +128,23 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } + test("ExceptionFailure backward compatibility") { + val exceptionFailureJson = + """{"Reason":"ExceptionFailure","Class Name":"To be","Description":"or not to be", + |"Stack Trace":[{"Declaring Class":"Apollo","Method Name":"Venus","File Name":"Mercury", + |"Line Number":42},{"Declaring Class":"Afollo","Method Name":"Vemus","File Name":"Mercurry" + |,"Line Number":420},{"Declaring Class":"Ayollo","Method Name":"Vesus","File Name":"Blackbe + |rry","Line Number":4200}]}""".stripMargin.replaceAll("\r|\n", "") + + val exception = new Exception("Out of Memory! Please restock film.") + exception.setStackTrace(stackTrace) + val expectedExceptionFailure = ExceptionFailure("To be", "or not to be", + Utils.exceptionString("To be", "or not to be", stackTrace), None) + + val exceptionFailure = JsonProtocol.taskEndReasonFromJson(parse(exceptionFailureJson)) + assertEquals(expectedExceptionFailure, exceptionFailure) + } + test("StageInfo backward compatibility") { val info = makeStageInfo(1, 2, 3, 4L, 5L) val newJson = JsonProtocol.stageInfoToJson(info) @@ -401,7 +419,7 @@ class JsonProtocolSuite extends FunSuite { case (r1: ExceptionFailure, r2: ExceptionFailure) => assert(r1.className === r2.className) assert(r1.description === r2.description) - assertSeqEquals(r1.stackTrace, r2.stackTrace, assertStackTraceElementEquals) + assert(r1.stackTrace === r2.stackTrace) assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals) case (TaskResultLost, TaskResultLost) => case (TaskKilled, TaskKilled) => From 1e50f7140ac511b8e08f70e63a0ef4b201c263ff Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 4 Nov 2014 17:30:02 +0800 Subject: [PATCH 2/6] Update as per review and increase the max height of the stack trace details --- .../resources/org/apache/spark/ui/static/webui.css | 14 ++++++++++++++ .../scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++++-- .../org/apache/spark/ui/jobs/StageTable.scala | 6 ++++-- .../main/scala/org/apache/spark/util/Utils.scala | 5 ++++- 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index a2220e761ac98..db57712c83503 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -120,6 +120,20 @@ pre { border: none; } +.stacktrace-details { + max-height: 300px; + overflow-y: auto; + margin: 0; + transition: max-height 0.5s ease-out, padding 0.5s ease-out; +} + +.stacktrace-details.collapsed { + max-height: 0; + padding-top: 0; + padding-bottom: 0; + border: none; +} + span.expand-additional-metrics { cursor: pointer; } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 9580c2b9a488c..734a8fa6faee3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -427,13 +427,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { error }) val details = if (isMultiline) { - +details ++ -