-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11253][SQL] reset all accumulators in physical operators before execute an action #9215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
4589e66
6397cf5
778992e
4ff8912
b088c70
6923c4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,14 +17,14 @@ | |
|
|
||
| package org.apache.spark.sql.util | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.sql.{functions, QueryTest} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} | ||
| import org.apache.spark.sql.execution.QueryExecution | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { | ||
| import testImplicits._ | ||
| import functions._ | ||
|
|
@@ -80,4 +80,66 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { | |
| assert(metrics(0)._2.analyzed.isInstanceOf[Project]) | ||
| assert(metrics(0)._3.getMessage == e.getMessage) | ||
| } | ||
|
|
||
| test("get numRows metrics by callback") { | ||
| val metrics = ArrayBuffer.empty[Long] | ||
| val listener = new QueryExecutionListener { | ||
| // Only test successful case here, so no need to implement `onFailure` | ||
| override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} | ||
|
|
||
| override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { | ||
| metrics += qe.executedPlan.longMetric("numInputRows").value.value | ||
| } | ||
| } | ||
| sqlContext.listenerManager.register(listener) | ||
|
|
||
| val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count() | ||
| df.collect() | ||
| df.collect() | ||
| Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect() | ||
|
|
||
| assert(metrics.length == 3) | ||
| assert(metrics(0) == 1) | ||
| assert(metrics(1) == 1) | ||
| assert(metrics(2) == 2) | ||
| } | ||
|
|
||
| test("get size metrics by callback") { | ||
| val metrics = ArrayBuffer.empty[Long] | ||
| val listener = new QueryExecutionListener { | ||
| // Only test successful case here, so no need to implement `onFailure` | ||
| override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} | ||
|
|
||
| override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { | ||
| metrics += qe.executedPlan.longMetric("dataSize").value.value | ||
| val bottomAgg = qe.executedPlan.children(0).children(0) | ||
| metrics += bottomAgg.longMetric("dataSize").value.value | ||
| } | ||
| } | ||
| sqlContext.listenerManager.register(listener) | ||
|
|
||
| val sparkListener = new SaveInfoListener | ||
| sqlContext.sparkContext.addSparkListener(sparkListener) | ||
|
|
||
| val df = (1 to 100).map(i => i -> i.toString).toDF("i", "j") | ||
| df.groupBy("i").count().collect() | ||
|
|
||
| def getPeakExecutionMemory(stageId: Int): Long = { | ||
| val peakMemoryAccumulator = sparkListener.getCompletedStageInfos(stageId).accumulables | ||
| .filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY) | ||
|
|
||
| assert(peakMemoryAccumulator.size == 1) | ||
| peakMemoryAccumulator.head._2.value.toLong | ||
| } | ||
|
|
||
| assert(sparkListener.getCompletedStageInfos.length == 2) | ||
| val bottomAggDataSize = getPeakExecutionMemory(0) | ||
| val topAggDataSize = getPeakExecutionMemory(1) | ||
|
|
||
| // For this simple case, the peakExecutionMemory of a stage should be the data size of the | ||
| // aggregate operator, as we only have one memory consuming operator per stage. | ||
| assert(metrics.length == 2) | ||
| assert(metrics(0) == topAggDataSize) | ||
| assert(metrics(1) == bottomAggDataSize) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we run the same plan physical plan multiple times to make sure metrics are good?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry. Just one last thing. I think we need to call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch! |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan sorry. I just realized that this method is in the critical path (when we calculate numRows). How about we remove this change and document it clear that those negative initial values will have a small impact on the sum of memory consumption?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to reuse current codebase and didn't create a new SQLMetric(including new MetricValue, MetricParam, etc.). Is it worth to create a new one so that we won't hurt performance for numRows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is probably not worth that right now. The impact of those
-1just too small (1048576 tasks for 1 MB). I think we can do that later. What do you think?