Skip to content

Commit 2878cdb

Browse files
committed
[SPARK-29348][SQL][FOLLOWUP] Fix slight bug on streaming example for Dataset.observe
1 parent 481fb63 commit 2878cdb

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,14 +1891,10 @@ class Dataset[T] private[sql](
18911891
* [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
18921892
*
18931893
* {{{
1894-
* // Observe row count (rc) and error row count (erc) in the streaming Dataset
1895-
* val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
1896-
* observed_ds.writeStream.format("...").start()
1897-
*
18981894
* // Monitor the metrics using a listener.
18991895
* spark.streams.addListener(new StreamingQueryListener() {
19001896
* override def onQueryProgress(event: QueryProgressEvent): Unit = {
1901-
* event.progress.observedMetrics.get("my_event").foreach { row =>
1897+
* event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
19021898
* // Trigger if the number of errors exceeds 5 percent
19031899
* val num_rows = row.getAs[Long]("rc")
19041900
* val num_error_rows = row.getAs[Long]("erc")
@@ -1908,7 +1904,12 @@ class Dataset[T] private[sql](
19081904
* }
19091905
* }
19101906
* }
1907+
* def onQueryStarted(event: QueryStartedEvent): Unit = {}
1908+
* def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
19111909
* })
1910+
* // Observe row count (rc) and error row count (erc) in the streaming Dataset
1911+
* val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
1912+
* observed_ds.writeStream.format("...").start()
19121913
* }}}
19131914
*
19141915
* @group typedrel

0 commit comments

Comments
 (0)