Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SortExec, SparkPlan, WholeStageCodegenExec}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unneeded change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in the new commit.

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -194,10 +194,22 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
val ds = spark.range(10).sort('id)
testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
// Assume the execution plan with node id is
// Sort(nodeId = 0)
// Exchange(nodeId = 1)
// Project(nodeId = 2)
// LocalTableScan(nodeId = 3)
// Because of SPARK-25267, ConvertToLocalRelation is disabled in the test cases of sql/core,
// so Project here is not collapsed into LocalTableScan.
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
// In SortExec, sort time is collected as nanoseconds, but it is converted and stored as
// milliseconds. So sort time may be 0 if sort is executed very fast.
"sort time total (min, med, max)" -> timingMetricAllStatsShould(_ >= 0),
"peak memory total (min, med, max)" -> sizeMetricAllStatsShould(_ > 0),
"spill size total (min, med, max)" -> sizeMetricAllStatsShould(_ >= 0))))
))
}

test("SortMergeJoin metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.metric

import java.io.File
import java.util.regex.Pattern

import scala.collection.mutable.HashMap

Expand All @@ -40,6 +41,10 @@ trait SQLMetricsTestUtils extends SQLTestUtils {

protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore

protected val bytesPattern = Pattern.compile("([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)")

protected val durationPattern = Pattern.compile("([0-9]+(\\.[0-9]+)?) (ms|s|m|h)")

/**
* Get execution metrics for the SQL execution and verify metrics values.
*
Expand Down Expand Up @@ -185,19 +190,105 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
val optActualMetrics = getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetrics.keySet)
val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) =>
(nodeName, nodeMetrics.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue)
)}
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates)
}

/**
* Call `df.collect()` and verify if the collected metrics satisfy the specified predicates.
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: go to 100 chars and the next line has a bad indentation

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because usually metric values are numbers, so for metrics values, predicates could be more natural than regular expressions which are more suitable for text matching. For simple metric values, helper functions are not needed. However, timing and size metric values are a little complex:

  • timing metric value example: "\n96.2 MB (32.1 MB, 32.1 MB, 32.1 MB)"
  • size metric value example: "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"

With helper functions, we extract stats (by timingMetricStats or sizeMetricStats method), then we can apply predicates to check any stats (all stats or any single one). timingMetricAllStatsShould and sizeMetricAllStatsShould are not required, they are something like syntax sugar to eliminate boilerplate code since timing and size metrics are frequently used. If we want to check any single value (e.g sum >=0), we can provide a predicate like below:

timingMetricStats(_)(0)._1 >= 0

BTW, may be timing and size metric values should be stored in a more structured way rather than pure text format (even with "\n" in values).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indentation is not right. I have fixed it in the new commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is: as of now, pattern matching is enough for what we need to check and we do not have a use case when we actually need to parse the exact values. Doing that, we can simplify this PR and reduce considerably the size of this change. So I think we should go this way. If in the future we will need something like you proposed here because we want to check the actual values, then we can introduce all the methods you are suggesting here. But as of know this can be skipped IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does look like a load of additional code that I think duplicates some existing code in Utils? is it really necessary to make some basic assertions about metric values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I agree. Thanks for your detailed and clear explanation. Checking metric values do make things unnecessarily complex.

@srowen As @mgaido91 said, currently it is not necessary to check metric values, pattern matching is enough, and we could eliminate these methods. As for code duplication, methods here are not duplicate with code in Utils. Utils provides a bunch of methods to do conversion between string and bytes, bytes there are of type Long. However bytes in metric values are of type Float, e.g. 96.2 MB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have switched to pattern matching and also removed unnecessary helper methods in the new commit.

* `nodeId -> (operatorName, metric name -> metric value predicate)`.
*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetrics.keySet === actualMetrics.keySet)
for (nodeId <- expectedMetrics.keySet) {
val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId)
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for (nodeId <- expectedMetricsPredicates.keySet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a little cleaner to iterate over (key, value) pairs here and below rather than iterate over keys then get values:

for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap) <- expectedMetricsPredicates) {

val (expectedNodeName, expectedMetricsPredicatesMap) = expectedMetricsPredicates(nodeId)
val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId)
assert(expectedNodeName === actualNodeName)
for (metricName <- expectedMetricsMap.keySet) {
assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName))
for (metricName <- expectedMetricsPredicatesMap.keySet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a similar iteration over the map here that avoid the keySet and get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in the new commit.

assert(expectedMetricsPredicatesMap(metricName)(actualMetricsMap(metricName)))
}
}
}
}

private def metricStats(metricStr: String): Seq[String] = {
val sum = metricStr.substring(0, metricStr.indexOf("(")).stripPrefix("\n").stripSuffix(" ")
val minMedMax = metricStr.substring(metricStr.indexOf("(") + 1, metricStr.indexOf(")"))
.split(", ").toSeq
(sum +: minMedMax)
}

private def stringToBytes(str: String): (Float, String) = {
val matcher = bytesPattern.matcher(str)
if (matcher.matches()) {
(matcher.group(1).toFloat, matcher.group(3))
} else {
throw new NumberFormatException("Failed to parse byte string: " + str)
}
}

private def stringToDuration(str: String): (Float, String) = {
val matcher = durationPattern.matcher(str)
if (matcher.matches()) {
(matcher.group(1).toFloat, matcher.group(3))
} else {
throw new NumberFormatException("Failed to parse time string: " + str)
}
}

/**
* Convert a size metric string to a sequence of stats, including sum, min, med and max in order,
* each a tuple of (value, unit).
* @param metricStr size metric string, e.g. "\n96.2 MB (32.1 MB, 32.1 MB, 32.1 MB)"
* @return A sequence of stats, e.g. ((96.2,MB), (32.1,MB), (32.1,MB), (32.1,MB))
*/
protected def sizeMetricStats(metricStr: String): Seq[(Float, String)] = {
metricStats(metricStr).map(stringToBytes)
}

/**
* Convert a timing metric string to a sequence of stats, including sum, min, med and max in
* order, each a tuple of (value, unit).
* @param metricStr timing metric string, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
* @return A sequence of stats, e.g. ((2.0,ms), (1.0,ms), (1.0,ms), (1.0,ms))
*/
protected def timingMetricStats(metricStr: String): Seq[(Float, String)] = {
metricStats(metricStr).map(stringToDuration)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to put these helper functions here? That's because these functions are only used for test("Sort metrics") now ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently these functions are only used for test("Sort metrics"). What SQLMetricsSuite has been checking are almost all integer number metrics (e.g. "number of output rows", "records read", ...). However we should also check non-integer metrics, such as timing metric and size metric. These metrics are in the same format of "total (min, med, max)". These help functions could be used to check all these metrics. Please see the screenshot I posted above to see more timing or size metric examples (shuffle write, shuffle read, ...).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually remove all them for now. I think we can just check that the metrics are defined, since we are not really checking their values (the only one for which we are ensuring something is the peak memory...). I'd propose defining a testSparkPlanMetricsPattern which is basically the same as testSparkPlanMetrics but instead of providing a value for each metric, we pass a pattern. What do you think?

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great idea to add a method similar to testSparkPlanMetrics. Let me try. I'd like to slightly change the method name to testSparkPlanMetricsWithPredicates, since we are actually passing in predicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for checking metrics, checking ">= 0" is better than just checking whether it is defined. because size or timing SQLMetric could be initialized by non-0 values, e.g. -1.

def createSizeMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may look like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a new commit, I have added SQLMetricsTestUtils#testSparkPlanMetricsWithPredicates. In such a way, we simply need to provide a test spec in test("Sort metrics") to make the test case declarative rather than procedural.

To simplify timing and size metric testing, I added 2 common predicates, timingMetricAllStatsShould and sizeMetricAllStatsShould. These could be used for other metrics as long as they are timing or size metrics.

And I also modified the original testSparkPlanMetrics to make it a special case of testSparkPlanMetricsWithPredicates, where each expected metric value is converted to an equality predicate. This eliminated duplicate code as testSparkPlanMetrics and testSparkPlanMetricsWithPredicates are almost the same.


/**
* Returns a function to check whether all stats (sum, min, med and max) of a timing metric
* satisfy the specified predicate.
* @param predicate predicate to check stats
* @return function to check all stats of a timing metric
*/
protected def timingMetricAllStatsShould(predicate: Float => Boolean): Any => Boolean = {
(timingMetric: Any) =>
timingMetricStats(timingMetric.toString).forall { case (duration, _) => predicate(duration) }
}

/**
* Returns a function to check whether all stats (sum, min, med and max) of a size metric satisfy
* the specified predicate.
* @param predicate predicate to check stats
* @return function to check all stats of a size metric
*/
protected def sizeMetricAllStatsShould(predicate: Float => Boolean): Any => Boolean = {
(sizeMetric: Any) =>
sizeMetricStats(sizeMetric.toString).forall { case (bytes, _) => predicate(bytes)}
}
}


Expand Down