Skip to content

Commit b8dccff

Browse files
wangyumGitHub Enterprise
authored andcommitted
Revert "[CARMEL-7385][CARMEL-6381] Remove unnecessary sql metrics for UnionExec" (apache#129)
Reverts carmel/ebay-spark#119
1 parent 20763f1 commit b8dccff

6 files changed

Lines changed: 25 additions & 98 deletions

File tree

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -296,18 +296,6 @@ package object config {
296296
.stringConf
297297
.createWithDefaultString("file,hdfs")
298298

299-
private[spark] val EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED =
300-
ConfigBuilder("spark.executor.metrics.send.updated.exceptFistPart")
301-
.doc("Only sent updated metrics to driver side for all tasks except the first " +
302-
"partition, the first partition will send back all metrics, because some metrics " +
303-
"like sql related metrics is needed from driver side even it is zero, but only " +
304-
"one partition send back the zero metrics is good enough, that will save lots " +
305-
"of driver memory especially for union rdds, which contains lots of unused metrics " +
306-
"for each task.")
307-
.version("3.5.0")
308-
.booleanConf
309-
.createWithDefault(true)
310-
311299
private[spark] val EXECUTOR_JAVA_OPTIONS =
312300
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS)
313301
.withPrepended(SparkLauncher.EXECUTOR_DEFAULT_JAVA_OPTIONS)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ private[spark] class DAGScheduler(
455455
prevShuffleSize.getAndAdd(currTaskShuffleSize)
456456
}
457457

458-
val event = CompletionEvent(task, reason, result, lightTaskMetrics, metricPeaks, taskInfo)
458+
val event = CompletionEvent(task, reason, result,
459+
lightAccumUpdates, lightTaskMetrics, metricPeaks, taskInfo)
459460
val stageOpt = stageIdToStage.get(task.stageId)
460461
if (stageOpt.isEmpty) {
461462
// The stage may have already finished when we get this event -- eg. maybe it was a
@@ -482,15 +483,14 @@ private[spark] class DAGScheduler(
482483
case Some(job) =>
483484
// Only update the accumulator once for each result task.
484485
if (!job.finished(rt.outputId)) {
485-
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
486+
updateAccumulators(event)
486487
}
487488
case None => // Ignore update if task's job has finished.
488489
}
489490
case _ =>
490-
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
491+
updateAccumulators(event)
491492
}
492-
case _: ExceptionFailure | _: TaskKilled =>
493-
updateAccumulators(event.task, lightAccumUpdates, event.taskInfo)
493+
case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event)
494494
case _ =>
495495
}
496496

@@ -504,7 +504,7 @@ private[spark] class DAGScheduler(
504504

505505
val taskMetricsForDAG: TaskMetrics = taskMetricsFromAccumulators(accumUpdatesForDAG)
506506
val eventForDAGScheduler = CompletionEvent(task, reason, result,
507-
taskMetricsForDAG, metricPeaks, taskInfo)
507+
accumUpdatesForDAG, taskMetricsForDAG, metricPeaks, taskInfo)
508508
eventProcessLoop.post(eventForDAGScheduler)
509509
}
510510

@@ -1844,7 +1844,14 @@ private[spark] class DAGScheduler(
18441844
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
18451845
// consistent view of both variables.
18461846
RDDCheckpointData.synchronized {
1847-
taskBinaryBytes = serializeTaskBinaries(stage)
1847+
taskBinaryBytes = stage match {
1848+
case stage: ShuffleMapStage =>
1849+
JavaUtils.bufferToArray(
1850+
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
1851+
case stage: ResultStage =>
1852+
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
1853+
}
1854+
18481855
partitions = stage.rdd.partitions
18491856
}
18501857
} catch {
@@ -1905,17 +1912,6 @@ private[spark] class DAGScheduler(
19051912
}
19061913
}
19071914

1908-
private[scheduler] def serializeTaskBinaries(stage: Stage): Array[Byte] = {
1909-
val taskBinaries = stage match {
1910-
case stage: ShuffleMapStage =>
1911-
JavaUtils.bufferToArray(
1912-
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
1913-
case stage: ResultStage =>
1914-
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
1915-
}
1916-
taskBinaries
1917-
}
1918-
19191915
private[scheduler] def handleSubmitMissingTask(missingTask: SubmitMissingTask): Unit = {
19201916
logDebug("submitMissingTasks(" + missingTask.stage + ")")
19211917
if (missingTask.taskBinary == null) {
@@ -2017,13 +2013,11 @@ private[spark] class DAGScheduler(
20172013
* This still doesn't stop the caller from updating the accumulator outside the scheduler,
20182014
* but that's not our problem since there's nothing we can do about that.
20192015
*/
2020-
private def updateAccumulators(
2021-
task: Task[_],
2022-
accumUpdates: Seq[AccumulatorV2[_, _]],
2023-
taskInfo: TaskInfo): Unit = {
2016+
private def updateAccumulators(event: CompletionEvent): Unit = {
2017+
val task = event.task
20242018
val stage = stageIdToStage(task.stageId)
20252019

2026-
accumUpdates.foreach { updates =>
2020+
event.accumUpdates.foreach { updates =>
20272021
val id = updates.id
20282022
try {
20292023
// Find the corresponding accumulator on the driver and update it
@@ -2038,8 +2032,8 @@ private[spark] class DAGScheduler(
20382032
// To avoid UI cruft, ignore cases where value wasn't updated
20392033
if (acc.name.isDefined && !updates.isZero) {
20402034
stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
2041-
taskInfo.setAccumulables(
2042-
acc.toInfo(Some(updates.value), Some(acc.value)) +: taskInfo.accumulables)
2035+
event.taskInfo.setAccumulables(
2036+
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
20432037
}
20442038
} catch {
20452039
case NonFatal(e) =>

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark._
2323
import org.apache.spark.broadcast.Broadcast
2424
import org.apache.spark.executor.TaskMetrics
2525
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.util.CallSite
26+
import org.apache.spark.util.{AccumulatorV2, CallSite}
2727

2828
/**
2929
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -84,6 +84,7 @@ private[scheduler] case class CompletionEvent(
8484
task: Task[_],
8585
reason: TaskEndReason,
8686
result: Any,
87+
accumUpdates: Seq[AccumulatorV2[_, _]],
8788
taskMetrics: TaskMetrics,
8889
metricPeaks: Array[Long],
8990
taskInfo: TaskInfo)

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
2121
import java.util.Properties
22+
2223
import org.apache.spark._
2324
import org.apache.spark.executor.TaskMetrics
24-
import org.apache.spark.internal.config.{APP_CALLER_CONTEXT, EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED}
25+
import org.apache.spark.internal.config.APP_CALLER_CONTEXT
2526
import org.apache.spark.internal.plugin.PluginContainer
2627
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
2728
import org.apache.spark.metrics.MetricsSystem
@@ -209,27 +210,12 @@ private[spark] abstract class Task[T](
209210
context.taskMetrics.nonZeroInternalAccums() ++
210211
// zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
211212
// filter them out.
212-
collectExternalAccumUpdates(context.taskMetrics.externalAccums, taskFailed)
213+
context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
213214
} else {
214215
Seq.empty
215216
}
216217
}
217218

218-
private def collectExternalAccumUpdates(
219-
extAccumUpdates: Seq[AccumulatorV2[_, _]], taskFailed: Boolean): Seq[AccumulatorV2[_, _]] = {
220-
val sentOnlyUpdatedMetricsExceptFirstPart =
221-
SparkEnv.get.conf.get(EXECUTOR_METRICS_SENT_UPDATED_EXCEPT_FIRST_PART_ENABLED)
222-
extAccumUpdates.filter { a =>
223-
var filter = !taskFailed || a.countFailedValues
224-
// only send all metrics for the first part
225-
// and send only updated metrics for other partitions
226-
if (sentOnlyUpdatedMetricsExceptFirstPart && partitionId != 0) {
227-
filter = filter && !a.isZero
228-
}
229-
filter
230-
}
231-
}
232-
233219
/**
234220
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
235221
* code and user code to properly handle the flag. This function should be idempotent so it can

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4974,7 +4974,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
49744974
} else {
49754975
null
49764976
}
4977-
CompletionEvent(task, reason, result, taskMetrics, metricPeaks, taskInfo)
4977+
CompletionEvent(task, reason, result, allAccumUpdates, taskMetrics, metricPeaks, taskInfo)
49784978
}
49794979
}
49804980

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -669,48 +669,6 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
669669
assert(invocationOrder === Seq("C", "B", "A", "D"))
670670
}
671671

672-
test("Only first partition updated external accumulators will be sent back to driver") {
673-
sc = new SparkContext("local", "test")
674-
// Create a dummy task. We won't end up running this; we just want to collect
675-
// accumulator updates from it.
676-
val taskMetrics1 = TaskMetrics.registered
677-
val ext1 = new LongAccumulator
678-
ext1.register(sc, Some("extAccum1"))
679-
taskMetrics1.registerAccumulator(ext1)
680-
val task1 = new Task[Int](0, 0, 0, 1, JobArtifactSet.getActiveOrDefault(sc)) {
681-
context = new TaskContextImpl(0, 0, 0, 0L, 0, 1,
682-
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
683-
new Properties,
684-
SparkEnv.get.metricsSystem,
685-
taskMetrics1)
686-
687-
override def runTask(tc: TaskContext): Int = 0
688-
}
689-
val updatedAccums = task1.collectAccumulatorUpdates()
690-
assert(updatedAccums.length == 2)
691-
assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE))
692-
assert(updatedAccums(0).value == 0)
693-
assert(updatedAccums(1).name == Some("extAccum1"))
694-
assert(updatedAccums(1).value == 0)
695-
696-
val taskMetrics2 = TaskMetrics.registered
697-
val ext2 = new LongAccumulator
698-
ext2.register(sc, Some("extAccum2"))
699-
taskMetrics2.registerAccumulator(ext2)
700-
val task2 = new Task[Int](0, 0, 1, 1, JobArtifactSet.getActiveOrDefault(sc)) {
701-
context = new TaskContextImpl(0, 0, 1, 0L, 0, 1,
702-
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
703-
new Properties,
704-
SparkEnv.get.metricsSystem,
705-
taskMetrics2)
706-
707-
override def runTask(tc: TaskContext): Int = 0
708-
}
709-
val updatedAccums2 = task2.collectAccumulatorUpdates()
710-
// external accumulators won't be send back for the second partition
711-
// when it is not updated
712-
assert(updatedAccums2.length == 1)
713-
}
714672
}
715673

716674
private object TaskContextSuite {

0 commit comments

Comments
 (0)