Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,59 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}

/**
* Extract statistics about stateful operators from the executed query plan.
* SPARK-19378: Still report stateOperator metrics even though no data was processed while
Copy link
Contributor

@tdas tdas Jan 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not make sense to have jira numbers in a methods scala docs. Just state what it does.

* reporting progress.
*/
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
// Walking the plan again should be inexpensive.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
stateNodes.map { node =>
val numRowsUpdated = if (hasNewData) {
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
} else {
0L
}
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = numRowsUpdated)
}
}

/**
* Extract statistics about event time from the executed query plan.
* SPARK-19378: Still report eventTime metrics even though no data was processed while
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

* reporting progress.
*/
private def extractEventTimeStats(watermarkTs: Map[String, String]): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not make sense for this method to take this watermarkTs as a param. its not extracting event time states from watermark ts, its just appending it. Then why not just return empty map, and do the appending outside? Or do the extraction of watermark inside the function as well.

if (lastExecution == null) return watermarkTs
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Map(
"max" -> stats.max,
"min" -> stats.min,
"avg" -> stats.avg).mapValues(formatTimestamp)
}.headOption.getOrElse(Map.empty) ++ watermarkTs
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]

val stateOperators = extractStateOperatorMetrics(hasNewData)
val eventTimeStats = extractEventTimeStats(watermarkTimestamp)

if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
return ExecutionStats(Map.empty, stateOperators, eventTimeStats)
}

// We want to associate execution plan leaves to sources that generate them, so that we match
Expand Down Expand Up @@ -237,25 +281,6 @@ trait ProgressReporter extends Logging {
Map.empty
}

// Extract statistics about stateful operators in the query plan.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
val stateOperators = stateNodes.map { node =>
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}

val eventTimeStats = lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Map(
"max" -> stats.max,
"min" -> stats.min,
"avg" -> stats.avg).mapValues(formatTimestamp)
}.headOption.getOrElse(Map.empty) ++ watermarkTimestamp

ExecutionStats(numInputRows, stateOperators, eventTimeStats)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.sql.streaming
import java.util.UUID

import scala.collection.JavaConverters._
import scala.language.postfixOps._

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._


class StreamingQueryStatusAndProgressSuite extends StreamTest {
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
implicit class EqualsIgnoreCRLF(source: String) {
def equalsIgnoreCRLF(target: String): Boolean = {
source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
Expand Down Expand Up @@ -171,6 +174,42 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest {
query.stop()
}
}

test("SPARK-19378: Continue reporting stateOp and eventTime metrics even if there is no data") {
import testImplicits._

withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
val inputData = MemoryStream[(Int, String)]

val query = inputData.toDS().toDF("value", "time")
.select('value, 'time.cast("timestamp"))
.withWatermark("time", "10 seconds")
.groupBy($"value")
.agg(count("*"))
.writeStream
.queryName("metric_continuity")
.format("memory")
.outputMode("complete")
.start()
try {
inputData.addData((1, "2017-01-26 01:00:00"), (2, "2017-01-26 01:00:02"))
query.processAllAvailable()

val progress = query.lastProgress
assert(progress.eventTime.size() > 1)
assert(progress.stateOperators.length > 0)
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
eventually(timeout(1 minute)) {
val nextProgress = query.lastProgress
assert(nextProgress.timestamp !== progress.timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explicitly verify that this progress has no data?

assert(progress.eventTime.size() > 1)
assert(progress.stateOperators.length > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are not verifying that that the metric values are as expected.

}
} finally {
query.stop()
}
}
}
}

object StreamingQueryStatusAndProgressSuite {
Expand Down