Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SortExec, SparkPlan, WholeStageCodegenExec}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unneeded change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in the new commit.

import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -194,10 +194,20 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
val ds = spark.range(10).sort('id)
testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
// Assume the execution plan with node id is
// Sort(nodeId = 0)
// Exchange(nodeId = 1)
// Project(nodeId = 2)
// LocalTableScan(nodeId = 3)
// Because of SPARK-25267, ConvertToLocalRelation is disabled in the test cases of sql/core,
// so Project here is not collapsed into LocalTableScan.
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max)" -> checkPattern(timingMetricPattern),
"peak memory total (min, med, max)" -> checkPattern(sizeMetricPattern),
"spill size total (min, med, max)" -> checkPattern(sizeMetricPattern))))
))
}

test("SortMergeJoin metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.metric

import java.io.File
import java.util.regex.Pattern

import scala.collection.mutable.HashMap

Expand All @@ -40,6 +41,26 @@ trait SQLMetricsTestUtils extends SQLTestUtils {

protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore

protected val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private? and maybe close to where it is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have inlined this in an initializer as @srowen suggested.


protected val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private? or you can inline this in an initializer below:

protected val sizeMetricPattern = {
  val bytes = ...
  "s\\n$bytes...".r
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have inlined this in an initializer as @srowen suggested.


// "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we say something more here? A line which explains what this is and then eg. and your example is fine IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added more comments.

protected val sizeMetricPattern = Pattern.compile(s"\\n$bytes \\($bytes, $bytes, $bytes\\)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add .r to the end of these strings to make them a scala.util.matching.Regex automatically. That's more idiomatic for Scala. No need to import and use Java's Pattern.


// "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern =
Pattern.compile(s"\\n$duration \\($duration, $duration, $duration\\)")

/** Generate a function to check the specified pattern.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in the next line

*
* @param pattern a pattern
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not very useful, we can remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed checkPattern method.

* @return a function to check the specified pattern
*/
protected def checkPattern(pattern: Pattern): (Any => Boolean) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method really needed? the only place it's used is the very specific method for testing metrics, and that always provides a regex. Just provide a map to regexes that you check against, rather than whole predicates?

Or, consider not compiling regexes above and keeping them as string patterns. Then, the predicate you pass is just something like sizeMetricPattern.matches(_). It means compiling the regex on every check, but for this test context, that's no big deal.

That would help limit the complexity of all this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to take the 2nd option.

(in: Any) => pattern.matcher(in.toString).matches()
}

/**
* Get execution metrics for the SQL execution and verify metrics values.
*
Expand Down Expand Up @@ -198,6 +219,32 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can update this in order to avoid code duplication and reuse testSparkPlanMetrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated testSparkPlanMetrics to invoke testSparkPlanMetricsWithPredicates to avoid code duplication.

}
}

/**
* Call `df.collect()` and verify if the collected metrics satisfy the specified predicates.
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: go to 100 chars and the next line has a bad indentation

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because usually metric values are numbers, so for metrics values, predicates could be more natural than regular expressions which are more suitable for text matching. For simple metric values, helper functions are not needed. However, timing and size metric values are a little complex:

  • timing metric value example: "\n96.2 MB (32.1 MB, 32.1 MB, 32.1 MB)"
  • size metric value example: "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"

With helper functions, we extract stats (by timingMetricStats or sizeMetricStats method), then we can apply predicates to check any stats (all stats or any single one). timingMetricAllStatsShould and sizeMetricAllStatsShould are not required, they are something like syntax sugar to eliminate boilerplate code since timing and size metrics are frequently used. If we want to check any single value (e.g sum >=0), we can provide a predicate like below:

timingMetricStats(_)(0)._1 >= 0

BTW, may be timing and size metric values should be stored in a more structured way rather than pure text format (even with "\n" in values).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indentation is not right. I have fixed it in the new commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is: as of now, pattern matching is enough for what we need to check and we do not have a use case when we actually need to parse the exact values. Doing that, we can simplify this PR and reduce considerably the size of this change. So I think we should go this way. If in the future we will need something like you proposed here because we want to check the actual values, then we can introduce all the methods you are suggesting here. But as of know this can be skipped IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does look like a load of additional code that I think duplicates some existing code in Utils? is it really necessary to make some basic assertions about metric values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I agree. Thanks for your detailed and clear explanation. Checking metric values do make things unnecessarily complex.

@srowen As @mgaido91 said, currently it is not necessary to check metric values, pattern matching is enough, and we could eliminate these methods. As for code duplication, methods here are not duplicate with code in Utils. Utils provides a bunch of methods to do conversion between string and bytes, bytes there are of type Long. However bytes in metric values are of type Float, e.g. 96.2 MB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have switched to pattern matching and also removed unnecessary helper methods in the new commit.

* `nodeId -> (operatorName, metric name -> metric value predicate)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed indentation.

*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for (nodeId <- expectedMetricsPredicates.keySet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a little cleaner to iterate over (key, value) pairs here and below rather than iterate over keys then get values:

for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap) <- expectedMetricsPredicates) {

val (expectedNodeName, expectedMetricsPredicatesMap) = expectedMetricsPredicates(nodeId)
val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId)
assert(expectedNodeName === actualNodeName)
for (metricName <- expectedMetricsPredicatesMap.keySet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a similar iteration over the map here that avoid the keySet and get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in the new commit.

assert(expectedMetricsPredicatesMap(metricName)(actualMetricsMap(metricName)))
}
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to put these helper functions here? That's because these functions are only used for test("Sort metrics") now ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently these functions are only used for test("Sort metrics"). What SQLMetricsSuite has been checking are almost all integer number metrics (e.g. "number of output rows", "records read", ...). However we should also check non-integer metrics, such as timing metric and size metric. These metrics are in the same format of "total (min, med, max)". These help functions could be used to check all these metrics. Please see the screenshot I posted above to see more timing or size metric examples (shuffle write, shuffle read, ...).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually remove all them for now. I think we can just check that the metrics are defined, since we are not really checking their values (the only one for which we are ensuring something is the peak memory...). I'd propose defining a testSparkPlanMetricsPattern which is basically the same as testSparkPlanMetrics but instead of providing a value for each metric, we pass a pattern. What do you think?

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a great idea to add a method similar to testSparkPlanMetrics. Let me try. I'd like to slightly change the method name to testSparkPlanMetricsWithPredicates, since we are actually passing in predicates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for checking metrics, checking ">= 0" is better than just checking whether it is defined. because size or timing SQLMetric could be initialized by non-0 values, e.g. -1.

def createSizeMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may look like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}
def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
acc.register(sc, name = Some(s"$name total (min, med, max)"), countFailedValues = false)
acc
}

Copy link
Contributor Author

@seancxmao seancxmao Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a new commit, I have added SQLMetricsTestUtils#testSparkPlanMetricsWithPredicates. In such a way, we simply need to provide a test spec in test("Sort metrics") to make the test case declarative rather than procedural.

To simplify timing and size metric testing, I added 2 common predicates, timingMetricAllStatsShould and sizeMetricAllStatsShould. These could be used for other metrics as long as they are timing or size metrics.

And I also modified the original testSparkPlanMetrics to make it a special case of testSparkPlanMetricsWithPredicates, where each expected metric value is converted to an equality predicate. This eliminated duplicate code as testSparkPlanMetrics and testSparkPlanMetricsWithPredicates are almost the same.

}


Expand Down