Skip to content

Commit c6a4e3d

Browse files
committed
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException (branch 2.1)
## What changes were proposed in this pull request? Backport #16125 to branch 2.1. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16153 from zsxwing/SPARK-18694-2.1.
1 parent 39759ff commit c6a4e3d

8 files changed

Lines changed: 119 additions & 25 deletions

File tree

project/MimaExcludes.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,14 @@ object MimaExcludes {
9797
// [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
9898
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
9999
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
100-
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
100+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
101+
102+
// [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
103+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
104+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
105+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
106+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
107+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
101108
)
102109
}
103110

python/pyspark/sql/streaming.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from pyspark.rdd import ignore_unicode_prefix
3131
from pyspark.sql.readwriter import OptionUtils, to_str
3232
from pyspark.sql.types import *
33+
from pyspark.sql.utils import StreamingQueryException
3334

3435
__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
3536

@@ -132,6 +133,45 @@ def stop(self):
132133
"""
133134
self._jsq.stop()
134135

136+
@since(2.1)
137+
def explain(self, extended=False):
138+
"""Prints the (logical and physical) plans to the console for debugging purpose.
139+
140+
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
141+
142+
>>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
143+
>>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
144+
>>> sq.explain()
145+
== Physical Plan ==
146+
...
147+
>>> sq.explain(True)
148+
== Parsed Logical Plan ==
149+
...
150+
== Analyzed Logical Plan ==
151+
...
152+
== Optimized Logical Plan ==
153+
...
154+
== Physical Plan ==
155+
...
156+
>>> sq.stop()
157+
"""
158+
# Cannot call `_jsq.explain(...)` because it will print in the JVM process.
159+
# We should print it in the Python process.
160+
print(self._jsq.explainInternal(extended))
161+
162+
@since(2.1)
163+
def exception(self):
164+
"""
165+
:return: the StreamingQueryException if the query was terminated by an exception, or None.
166+
"""
167+
if self._jsq.exception().isDefined():
168+
je = self._jsq.exception().get()
169+
msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
170+
stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
171+
return StreamingQueryException(msg, stackTrace)
172+
else:
173+
return None
174+
135175

136176
class StreamingQueryManager(object):
137177
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.

python/pyspark/sql/tests.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,35 @@ def test_stream_await_termination(self):
11371137
q.stop()
11381138
shutil.rmtree(tmpPath)
11391139

1140+
def test_stream_exception(self):
1141+
sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
1142+
sq = sdf.writeStream.format('memory').queryName('query_explain').start()
1143+
try:
1144+
sq.processAllAvailable()
1145+
self.assertEqual(sq.exception(), None)
1146+
finally:
1147+
sq.stop()
1148+
1149+
from pyspark.sql.functions import col, udf
1150+
from pyspark.sql.utils import StreamingQueryException
1151+
bad_udf = udf(lambda x: 1 / 0)
1152+
sq = sdf.select(bad_udf(col("value")))\
1153+
.writeStream\
1154+
.format('memory')\
1155+
.queryName('this_query')\
1156+
.start()
1157+
try:
1158+
# Process some data to fail the query
1159+
sq.processAllAvailable()
1160+
self.fail("bad udf should fail the query")
1161+
except StreamingQueryException as e:
1162+
# This is expected
1163+
self.assertTrue("ZeroDivisionError" in e.desc)
1164+
finally:
1165+
sq.stop()
1166+
self.assertTrue(type(sq.exception()) is StreamingQueryException)
1167+
self.assertTrue("ZeroDivisionError" in sq.exception().desc)
1168+
11401169
def test_query_manager_await_termination(self):
11411170
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
11421171
for q in self.spark._wrapped.streams.active:

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class StreamExecution(
9393
* once, since the field's value may change at any time.
9494
*/
9595
@volatile
96-
protected var availableOffsets = new StreamProgress
96+
var availableOffsets = new StreamProgress
9797

9898
/** The current batchId or -1 if execution has not yet been initialized. */
9999
protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
263263
this,
264264
s"Query $name terminated with exception: ${e.getMessage}",
265265
e,
266-
Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
266+
committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
267+
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
267268
logError(s"Query $name terminated with error", e)
268269
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
269270
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
2424
* :: Experimental ::
2525
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
2626
* that caused the failure.
27-
* @param query Query that caused the exception
2827
* @param message Message of this exception
2928
* @param cause Internal cause of this exception
30-
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
31-
* @param endOffset Ending offset (if known) of the range of data in exception occurred
29+
* @param startOffset Starting offset in json of the range of data in which exception occurred
30+
* @param endOffset Ending offset in json of the range of data in exception occurred
3231
* @since 2.0.0
3332
*/
3433
@Experimental
35-
class StreamingQueryException private[sql](
36-
@transient val query: StreamingQuery,
34+
class StreamingQueryException private(
35+
causeString: String,
3736
val message: String,
3837
val cause: Throwable,
39-
val startOffset: Option[OffsetSeq] = None,
40-
val endOffset: Option[OffsetSeq] = None)
38+
val startOffset: String,
39+
val endOffset: String)
4140
extends Exception(message, cause) {
4241

42+
private[sql] def this(
43+
query: StreamingQuery,
44+
message: String,
45+
cause: Throwable,
46+
startOffset: String,
47+
endOffset: String) {
48+
this(
49+
// scalastyle:off
50+
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
51+
|
52+
|${query.asInstanceOf[StreamExecution].toDebugString}
53+
""".stripMargin,
54+
// scalastyle:on
55+
message,
56+
cause,
57+
startOffset,
58+
endOffset)
59+
}
60+
4361
/** Time when the exception occurred */
4462
val time: Long = System.currentTimeMillis
4563

46-
override def toString(): String = {
47-
val causeStr =
48-
s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
49-
s"""
50-
|$causeStr
51-
|
52-
|${query.asInstanceOf[StreamExecution].toDebugString}
53-
""".stripMargin
54-
}
64+
override def toString(): String = causeString
5565
}

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
3838
class StateOperatorProgress private[sql](
3939
val numRowsTotal: Long,
4040
val numRowsUpdated: Long) {
41+
42+
/** The compact JSON representation of this progress. */
43+
def json: String = compact(render(jsonValue))
44+
45+
/** The pretty (i.e. indented) JSON representation of this progress. */
46+
def prettyJson: String = pretty(render(jsonValue))
47+
4148
private[sql] def jsonValue: JValue = {
4249
("numRowsTotal" -> JInt(numRowsTotal)) ~
4350
("numRowsUpdated" -> JInt(numRowsUpdated))

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
412412
eventually("microbatch thread not stopped after termination with failure") {
413413
assert(!currentStream.microBatchThread.isAlive)
414414
}
415-
verify(thrownException.query.eq(currentStream),
416-
s"incorrect query reference in exception")
417415
verify(currentStream.exception === Some(thrownException),
418416
s"incorrect exception returned by query.exception()")
419417

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
103103
TestAwaitTermination(ExpectException[SparkException]),
104104
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
105105
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
106-
AssertOnQuery(
107-
q => q.exception.get.startOffset.get.offsets ===
108-
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
109-
"incorrect start offset on exception")
106+
AssertOnQuery(q => {
107+
q.exception.get.startOffset ===
108+
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
109+
q.exception.get.endOffset ===
110+
q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
111+
}, "incorrect start offset or end offset on exception")
110112
)
111113
}
112114

0 commit comments

Comments
 (0)