Skip to content

Commit d60af8f

Browse files
committed
[SPARK-17096][SQL][STREAMING] Improve exception string reported through the StreamingQueryListener
## What changes were proposed in this pull request? Currently, the stackTrace (as `Array[StackTraceElements]`) reported through StreamingQueryListener.onQueryTerminated is useless as it has the stack trace of where StreamingQueryException is defined, not the stack trace of underlying exception. For example, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.stackTrace` will have ``` org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:211) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` This is basically useless, as it is location where the StreamingQueryException was defined. What we want is Here is the right way to reason about what should be posted as through StreamingQueryListener.onQueryTerminated - The actual exception could either be a SparkException, or an arbitrary exception. - SparkException reports the relevant executor stack trace of a failed task as a string in the the exception message. The `Array[StackTraceElements]` returned by `SparkException.stackTrace()` is mostly irrelevant. - For any arbitrary exception, the `Array[StackTraceElements]` returned by `exception.stackTrace()` may be relevant. - When there is an error in a streaming query, it's hard to reason whether the `Array[StackTraceElements]` is useful or not. In fact, it is not clear whether it is even useful to report the stack trace as this array of Java objects. It may be sufficient to report the strack trace as a string, along with the message. This is how Spark reported executor stra - Hence, this PR simplifies the API by removing the array `stackTrace` from `QueryTerminated`. Instead the `exception` returns a string containing the message and the stack trace of the actual underlying exception that failed the streaming query (i.e. not that of the StreamingQueryException). If anyone is interested in the actual stack trace as an array, can always access them through `streamingQuery.exception` which returns the exception object. With this change, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.exception` will be ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ArithmeticException: / by zero at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcII$sp(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:226) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1429) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1416) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1416) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) ... ``` It contains the relevant executor stack trace. In a case non-SparkException, if the streaming source MemoryStream throws an exception, exception message will have the relevant stack trace. ``` java.lang.RuntimeException: this is the exception message at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:103) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:313) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:313) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124) ``` Note that this change in the public `QueryTerminated` class is okay as the APIs are still experimental. ## How was this patch tested? Unit tests that test whether the right information is present in the exception message reported through QueryTerminated object. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#14675 from tdas/SPARK-17096.
1 parent cc97ea1 commit d60af8f

4 files changed

Lines changed: 10 additions & 14 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,7 @@ class StreamExecution(
217217
} finally {
218218
state = TERMINATED
219219
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
220-
postEvent(new QueryTerminated(
221-
this.toInfo,
222-
exception.map(_.getMessage),
223-
exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
220+
postEvent(new QueryTerminated(this.toInfo, exception.map(_.cause).map(Utils.exceptionString)))
224221
terminationLatch.countDown()
225222
}
226223
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
2222

2323
/**
2424
* :: Experimental ::
25-
* Exception that stopped a [[StreamingQuery]].
25+
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
26+
* that caused the failure.
2627
* @param query Query that caused the exception
2728
* @param message Message of this exception
2829
* @param cause Internal cause of this exception

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,5 @@ object StreamingQueryListener {
108108
@Experimental
109109
class QueryTerminated private[sql](
110110
val queryInfo: StreamingQueryInfo,
111-
val exception: Option[String],
112-
val stackTrace: Seq[StackTraceElement]) extends Event
111+
val exception: Option[String]) extends Event
113112
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
9494
assert(status.id === query.id)
9595
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
9696
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
97-
assert(listener.terminationStackTrace.isEmpty)
9897
assert(listener.terminationException === None)
9998
}
10099
listener.checkAsyncErrors()
@@ -147,7 +146,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
147146
}
148147
}
149148

150-
test("exception should be reported in QueryTerminated") {
149+
testQuietly("exception should be reported in QueryTerminated") {
151150
val listener = new QueryStatusCollector
152151
withListenerAdded(listener) {
153152
val input = MemoryStream[Int]
@@ -159,8 +158,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
159158
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
160159
assert(listener.terminationStatus !== null)
161160
assert(listener.terminationException.isDefined)
161+
// Make sure that the exception message reported through listener
162+
// contains the actual exception and relevant stack trace
163+
assert(!listener.terminationException.get.contains("StreamingQueryException"))
162164
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
163-
assert(listener.terminationStackTrace.nonEmpty)
165+
assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
164166
}
165167
)
166168
}
@@ -205,8 +207,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
205207
val exception = new RuntimeException("exception")
206208
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
207209
queryTerminatedInfo,
208-
Some(exception.getMessage),
209-
exception.getStackTrace)
210+
Some(exception.getMessage))
210211
val json =
211212
JsonProtocol.sparkEventToJson(queryQueryTerminated)
212213
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
@@ -262,7 +263,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
262263
@volatile var startStatus: StreamingQueryInfo = null
263264
@volatile var terminationStatus: StreamingQueryInfo = null
264265
@volatile var terminationException: Option[String] = null
265-
@volatile var terminationStackTrace: Seq[StackTraceElement] = null
266266

267267
val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]
268268

@@ -296,7 +296,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
296296
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
297297
terminationStatus = queryTerminated.queryInfo
298298
terminationException = queryTerminated.exception
299-
terminationStackTrace = queryTerminated.stackTrace
300299
}
301300
asyncTestWaiter.dismiss()
302301
}

0 commit comments

Comments
 (0)